This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2021-03-11
Channels
- # aleph (50)
- # announcements (20)
- # babashka (68)
- # beginners (70)
- # calva (25)
- # cider (1)
- # clj-kondo (10)
- # cljs-dev (3)
- # clojars (1)
- # clojure (113)
- # clojure-australia (7)
- # clojure-berlin (4)
- # clojure-europe (47)
- # clojure-italy (14)
- # clojure-nl (2)
- # clojure-norway (1)
- # clojure-serbia (5)
- # clojure-spec (11)
- # clojure-uk (8)
- # clojurescript (16)
- # community-development (2)
- # conjure (2)
- # cursive (15)
- # data-oriented-programming (35)
- # datahike (12)
- # datascript (5)
- # datomic (13)
- # duct (7)
- # fulcro (21)
- # graalvm (94)
- # graphql (1)
- # helix (4)
- # honeysql (19)
- # jackdaw (8)
- # jobs (2)
- # jobs-rus (1)
- # leiningen (1)
- # malli (32)
- # missionary (1)
- # mount (1)
- # off-topic (40)
- # perun (2)
- # portal (7)
- # reitit (10)
- # rewrite-clj (26)
- # shadow-cljs (90)
- # spacemacs (29)
- # sql (17)
- # tools-deps (49)
- # wasm (1)
- # xtdb (26)
I am both a Kafka and jackdaw newbie, with some extremely basic questions; I have configured an existing app to publish JSON to a topic, and for my first jackdaw-based client, I want to subscribe to that topic, and read the messages (Once I grok this, I will transition to a streams-based approach). I have it working (see below), but the JSON decoding is all "manual", every attempt I've made to specify/use a JSON serde has failed (see commented out lines), either I misunderstand what serde does, or I am doing it wrong. Any advice appreciated...
(ns ha
(:require
[clojure.data.json :as json]
[jackdaw.client :as jc]
[jackdaw.client.log :as jl]
[jackdaw.serdes.json :as jsj])
(:import
(org.apache.kafka.common.serialization Serdes)))
(def consumer-config
{"bootstrap.servers" "192.168.132.202:9090"
"group.id" "test-ha-consumer"
;; "key.deserializer" (jsj/deserializer)
;; "value.deserializer" (jsj/deserializer)
;; "value.deserializer" (jsj/from-bytes)
"key.deserializer" "org.apache.kafka.common.serialization.ByteArrayDeserializer"
"value.deserializer" "org.apache.kafka.common.serialization.ByteArrayDeserializer"
})
(def topic-foo
{:topic-name "waverley.home-assistant"
;; :key-serde (jsj/serde)
;; :value-serde (jsj/serde)
})
(defn foo
[]
(with-open [my-consumer (-> (jc/consumer consumer-config)
(jc/subscribe [topic-foo]))]
(doseq [{:keys [key value partition timestamp offset] :as message} (jl/log my-consumer 1000)]
;; (println "keys: " (keys message))
(println "key: " key)
(println "value: " (json/read-str (jsj/from-bytes value) :key-fn keyword))
(println "partition: " partition)
(println "timestamp: " timestamp)
(println "offset: " offset))))
Ah I think you're not far off. Think what you need to make it work is to move the bit you have commented out from topic-foo
to the 2nd parameter in the call to jc/consumer
. Here's the corresponding function signature you're trying to match https://github.com/FundingCircle/jackdaw/blob/master/src/jackdaw/client.clj#L96
Thanks! If I do that, do I still need to specify key and value .deserializer
in consumer-config
?
You can do. That gets used as the default (often the consumer config is shared between lots of invocations of jc/consumer)
OK, that works! Thank you!!!
So, the key and value -serde in t
supersede the consumer-config deserializers?
I just tried it both ways, leaving the deserializers in, and removing them, AFAICT both worked fine.....
I can also explain why this bit doesn't work as you might expect
;; "key.deserializer" (jsj/deserializer)
;; "value.deserializer" (jsj/deserializer)
When this configuration is provided as a consumer-config, it's as if a properties file is being used. The values must be the names of classes that the kafka can find at runtime. So in theory you could do (name (class (jsj/deserializer)))
. Unfortunately kafka looks for the class using the standard class-loader which isn't smart enough to find classes defined dynamically by clojure.