This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
playing around with https://github.com/Blizzard/node-rdkafka
(defn ->producer []
(p/let [producer (Producer. kafka-config)]
(let [resolve (p/deferred)
reject (p/deferred)]
(set! (.-on producer "ready") (fn []
(js/console.log "Producer is ready")
(p/resolve! resolve producer)))
(set! (.-on producer "delivery-report") (fn [err report]
(if err
(println (js->clj err))
(println (js->clj report)))))
(set! (.-on producer "event.error") (fn [err]
(js/console.log "event.error" err)
(p/reject! reject err)))
(.connect producer)
(.setPollInterval producer 100)
(p/any [resolve reject]))))
(defn create-producer []
(let [producer (Producer. kafka-config)
connection (p/deferred)]
(.on producer "ready" (fn [] (p/resolve! connection producer)))
(.on producer "delivery-report" (fn [err report]
(if err
(js/console.warn "Delivery error: " err)
(let [{:keys [key value]} (js->clj report {:keywordize-keys true})]
(println "Delivery data - key:" (str key) "value:" (str value))))))
(.on producer "event.error" (fn [err] (p/reject! connection err)))
(.connect producer)
(.setPollInterval producer 100)
connection))
👍 2