Fork me on GitHub
#jackdaw
<
2021-04-19
>
finchharold11:04:41

I'm trying to use the Kafka producer and consumer concept, and in my case, the producer is the debezium-connector and the topics are also created by it. So, I just need to use the consumer to read the messages from the topics

finchharold11:04:20

So, I configured my consumer via integrant like this...

(defmethod ig/prep-key ::consumer
  [_ {:keys [kafka-brokers kafka-group enable-auto-commit
             topics max-poll-records]
      :or {kafka-brokers "localhost:9092" kafka-group "myapp"
           enable-auto-commit false max-poll-records "100"}}]
  (timbre/info "Preparing consumer")
  {"bootstrap.servers" kafka-brokers
   "group.id" kafka-group
   "enable.auto.commit" enable-auto-commit
   "auto.offset.reset" "earliest"
   ;; Enviroment variable is always string
   "max.poll.records" (Integer/parseInt max-poll-records)
   "topics" (if topics
              (mapv #(hash-map :topic-name %) (str/split topics #","))
              (throw (IllegalArgumentException. "Kafka topics are
              required. You need specify atleast one topic.")))
   "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
   "value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"})


(defmethod ig/init-key ::consumer [_ config]
  (timbre/info "Configuring Kafka consumer" config)
  (-> (jc/consumer (dissoc config :topics))
      (jc/subscribe (get config "topics"))))


(defmethod ig/halt-key! ::consumer [_ consumer]
  (timbre/info "Stopping Kafka consumer")
  (when consumer
    (.close consumer)))

finchharold11:04:36

Now any idea how do I consume the messages using this consumer...?

finchharold11:04:16

basically, I got stuck on how to get the topic name, like if the debezium is the one that is creating the topic and producing into it then how do I refer that to the consumer to use it...?

finchharold11:04:23

I can even keep an eye on the Kafka logs for all the updates also via this command

docker run -it --network=docker-debezium_default --rm edenhill/kafkacat:1.6.0 kafkacat -C -b kafka:9092 -t myapp.public.chatrooms -o -10

finchharold11:04:41

where myapp.public.chatrooms is the topic where all the updates are being produced

finchharold11:04:48

but how do I use it in the code?

gphilipp11:04:16

Hi @kishorekaranam99 I’ll have a look at your example shortly

finchharold11:04:39

Thank you @gphilipp, much appreciated.

finchharold12:04:32

Hey @gphilipp, did you have the chance to take a look at it? if not please do it whenever you can. Thank you.

Daniel Stephens15:04:23

jc/subscribe (and jc/consumer) both return the consumer, you then need to call jc/poll on that consumer to read the messages. jc/poll returns just the first 'batch' of readable messages so you tend to call this in a while loop continuously:

(while @run
  (doseq [msg (jc/poll consumer 1000)]
    (println msg)))

Daniel Stephens15:04:36

The topic name format on the connector is likely to be configurable, the default is apparently server.schema.table so you should know the value in advance

Daniel Stephens15:04:00

Looks like jackdaw doesn't wrap it, but you can use java interop on the consumer to subscribe to a topic-name Pattern which may be more helpful here, so you could subscribe to #"myapp\.public\..*" for example to pick up all the created topics.

gphilipp15:04:16

When using the consumer directly the pattern to go with is like this:

(while my-app-is-running
  (let [records (jackdaw.client/poll consumer (Duration/ofMillis 1000))]
      (doseq [{:keys [value]} records]
        ;; do something with the value
      )
      (.commitSync consumer)))