Fork me on GitHub
#jackdaw
<
2021-03-11
>
dcj16:03:25

I am both a Kafka and jackdaw newbie, with some extremely basic questions; I have configured an existing app to publish JSON to a topic, and for my first jackdaw-based client, I want to subscribe to that topic, and read the messages (Once I grok this, I will transition to a streams-based approach). I have it working (see below), but the JSON decoding is all "manual", every attempt I've made to specify/use a JSON serde has failed (see commented out lines), either I misunderstand what serde does, or I am doing it wrong. Any advice appreciated...

(ns ha
  (:require
   [clojure.data.json :as json]
   [jackdaw.client :as jc]
   [jackdaw.client.log :as jl]
   [jackdaw.serdes.json :as jsj])
  (:import
    (org.apache.kafka.common.serialization Serdes)))

(def consumer-config
  {"bootstrap.servers" "192.168.132.202:9090"
   "group.id"  "test-ha-consumer"
   ;; "key.deserializer" (jsj/deserializer)
   ;; "value.deserializer" (jsj/deserializer)
   ;; "value.deserializer" (jsj/from-bytes)
   "key.deserializer" "org.apache.kafka.common.serialization.ByteArrayDeserializer"
   "value.deserializer" "org.apache.kafka.common.serialization.ByteArrayDeserializer"
   })

(def topic-foo
  {:topic-name "waverley.home-assistant"
   ;; :key-serde (jsj/serde)
   ;; :value-serde (jsj/serde)
   })

(defn foo
  []
  (with-open [my-consumer (-> (jc/consumer consumer-config)
                              (jc/subscribe [topic-foo]))]
    (doseq [{:keys [key value partition timestamp offset] :as message} (jl/log my-consumer 1000)]
      ;; (println "keys: " (keys message))
      (println "key: " key)
      (println "value: " (json/read-str (jsj/from-bytes value) :key-fn keyword))
      (println "partition: " partition)
      (println "timestamp: " timestamp)
      (println "offset: " offset))))

cddr16:03:49

Ah I think you're not far off. Think what you need to make it work is to move the bit you have commented out from topic-foo to the 2nd parameter in the call to jc/consumer. Here's the corresponding function signature you're trying to match https://github.com/FundingCircle/jackdaw/blob/master/src/jackdaw/client.clj#L96

dcj16:03:40

Thanks! If I do that, do I still need to specify key and value .deserializer in consumer-config ?

cddr16:03:33

You can do. That gets used as the default (often the consumer config is shared between lots of invocations of jc/consumer)

dcj17:03:14

OK, that works! Thank you!!! So, the key and value -serde in t supersede the consumer-config deserializers? I just tried it both ways, leaving the deserializers in, and removing them, AFAICT both worked fine.....

cddr17:03:38

I can also explain why this bit doesn't work as you might expect

;; "key.deserializer" (jsj/deserializer)
 ;; "value.deserializer" (jsj/deserializer)
When this configuration is provided as a consumer-config, it's as if a properties file is being used. The values must be the names of classes that the kafka can find at runtime. So in theory you could do (name (class (jsj/deserializer))). Unfortunately kafka looks for the class using the standard class-loader which isn't smart enough to find classes defined dynamically by clojure.

dcj17:03:45

Super helpful, thank you! IIRC the error message I got was pretty clear that I was passing in an instance instead of a class....but my java-fu is weak also

👍 3
dcj17:03:04

Thank you again for your help!!! Now I can try streaming....

🚀 3