This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2021-04-19
Channels
- # announcements (24)
- # asami (25)
- # babashka (17)
- # beginners (99)
- # bitcoin (1)
- # calva (2)
- # cider (6)
- # cljs-dev (4)
- # clojure (88)
- # clojure-australia (3)
- # clojure-europe (23)
- # clojure-france (6)
- # clojure-nl (5)
- # clojure-uk (31)
- # clojured (1)
- # clojurescript (6)
- # clojureverse-ops (1)
- # datomic (28)
- # depstar (18)
- # emacs (11)
- # events (1)
- # fulcro (21)
- # graalvm (4)
- # graphql (7)
- # heroku (1)
- # jackdaw (18)
- # joker (3)
- # kaocha (1)
- # lsp (1)
- # malli (13)
- # meander (4)
- # off-topic (12)
- # pathom (14)
- # pedestal (2)
- # podcasts-discuss (1)
- # re-frame (37)
- # reagent (17)
- # reitit (9)
- # shadow-cljs (44)
- # xtdb (17)
I'm trying to use the Kafka producer and consumer concept, and in my case, the producer is the debezium-connector and the topics are also created by it. So, I just need to use the consumer to read the messages from the topics
So, I configured my consumer via integrant like this...
(defmethod ig/prep-key ::consumer
[_ {:keys [kafka-brokers kafka-group enable-auto-commit
topics max-poll-records]
:or {kafka-brokers "localhost:9092" kafka-group "myapp"
enable-auto-commit false max-poll-records "100"}}]
(timbre/info "Preparing consumer")
{"bootstrap.servers" kafka-brokers
"group.id" kafka-group
"enable.auto.commit" enable-auto-commit
"auto.offset.reset" "earliest"
;; Enviroment variable is always string
"max.poll.records" (Integer/parseInt max-poll-records)
"topics" (if topics
(mapv #(hash-map :topic-name %) (str/split topics #","))
(throw (IllegalArgumentException. "Kafka topics are
required. You need specify atleast one topic.")))
"key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
"value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"})
(defmethod ig/init-key ::consumer [_ config]
(timbre/info "Configuring Kafka consumer" config)
(-> (jc/consumer (dissoc config :topics))
(jc/subscribe (get config "topics"))))
(defmethod ig/halt-key! ::consumer [_ consumer]
(timbre/info "Stopping Kafka consumer")
(when consumer
(.close consumer)))
Now any idea how do I consume the messages using this consumer...?
basically, I got stuck on how to get the topic name, like if the debezium is the one that is creating the topic and producing into it then how do I refer that to the consumer to use it...?
I can even keep an eye on the Kafka logs for all the updates also via this command
docker run -it --network=docker-debezium_default --rm edenhill/kafkacat:1.6.0 kafkacat -C -b kafka:9092 -t myapp.public.chatrooms -o -10
where myapp.public.chatrooms is the topic where all the updates are being produced
but how do I use it in the code?
Hi @kishorekaranam99 I’ll have a look at your example shortly
Thank you @gphilipp, much appreciated.
Hey @gphilipp, did you have the chance to take a look at it? if not please do it whenever you can. Thank you.
ohkay
jc/subscribe
(and jc/consumer
) both return the consumer, you then need to call jc/poll
on that consumer to read the messages.
jc/poll
returns just the first 'batch' of readable messages so you tend to call this in a while loop continuously:
(while @run
(doseq [msg (jc/poll consumer 1000)]
(println msg)))
The topic name format on the connector is likely to be configurable, the default is apparently server.schema.table
so you should know the value in advance
Looks like jackdaw doesn't wrap it, but you can use java interop on the consumer to subscribe to a topic-name Pattern which may be more helpful here, so you could subscribe to #"myapp\.public\..*"
for example to pick up all the created topics.
When using the consumer directly the pattern to go with is like this:
(while my-app-is-running
(let [records (jackdaw.client/poll consumer (Duration/ofMillis 1000))]
(doseq [{:keys [value]} records]
;; do something with the value
)
(.commitSync consumer)))
The other alternative is to use the Kafka Streams DSL, have a look at https://github.com/FundingCircle/jackdaw/blob/master/examples/word-count/src/word_count.clj