This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2019-06-03
Channels
- # announcements (12)
- # beginners (44)
- # boot (27)
- # calva (73)
- # cider (1)
- # clj-kondo (9)
- # cljdoc (9)
- # cljs-dev (15)
- # cljsrn (6)
- # clojure (90)
- # clojure-dev (5)
- # clojure-europe (4)
- # clojure-ireland (3)
- # clojure-italy (22)
- # clojure-mexico (2)
- # clojure-nl (8)
- # clojure-uk (32)
- # clojurescript (12)
- # core-async (2)
- # cursive (16)
- # data-science (10)
- # datascript (3)
- # datomic (44)
- # emacs (17)
- # events (4)
- # graalvm (1)
- # hoplon (5)
- # jackdaw (17)
- # keechma (11)
- # nrepl (7)
- # off-topic (24)
- # re-frame (19)
- # reitit (4)
- # rewrite-clj (2)
- # robots (9)
- # shadow-cljs (20)
- # sql (12)
- # testing (4)
- # tools-deps (23)
- # vim (55)
Are there any guides on how to create a consumer? I currently get Missing required configuration "key.deserializer" which has no default value.
(defn -main
[& _]
(echo-environment)
(with-open [consumer (-> (jackdaw-client/consumer consumer-config)
(jackdaw-client/subscribe [my-topic]))]
(doseq [message (jackdaw-log/log consumer (-> env :topic :poll-interval-ms))]
(process/foreach message))))
(def consumer-config
(merge (-> env :broker)
{"key.deserializer" (.getName StringDeserializer)
"value.deserializer" (.getName StringDeserializer)}))
(:require [my-foo-process :as process]
[jackdaw.client.log :as jackdaw-log]
[jackdaw.client :as jackdaw-client]
[jackdaw.serdes.json :as json-serde]
[config.core :refer [env]]
[clojure.pprint :refer [pprint]])
(:import [org.apache.kafka.common.serialization Serdes StringDeserializer])
(:gen-class))
I use config.core to store in a config.edn values, such as :topic, :poll-interval-ms and :broker
Ah yeah, thanks for the example. ✌️
it's s bit, bitty, since it's copy&pasta from a program in use, so can't show company stuff
Still not 100% about when to use value-serdes and key-serdes in the client.consumer api though 😄
I know the java api for kafka, just something I dont want to use 🙂
nvm I figured it out now
Do you process your messages "async" with core.async @dharrigan?