This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2021-12-04
Channels
- # admin-announcements (1)
- # adventofcode (98)
- # announcements (5)
- # asami (3)
- # babashka (24)
- # beginners (51)
- # bitcoin (3)
- # calva (24)
- # clj-kondo (73)
- # cljdoc (5)
- # cljs-dev (2)
- # clojure (15)
- # clojure-czech (3)
- # clojure-dev (27)
- # clojure-europe (7)
- # clojure-gamedev (1)
- # clojure-italy (2)
- # clojure-uk (1)
- # conjure (4)
- # cursive (18)
- # datahike (4)
- # datomic (3)
- # deps-new (7)
- # emacs (1)
- # events (10)
- # fulcro (5)
- # honeysql (4)
- # jackdaw (2)
- # java (13)
- # lsp (85)
- # meander (9)
- # membrane (1)
- # minecraft (1)
- # off-topic (45)
- # re-frame (16)
- # sql (17)
- # tools-deps (10)
- # vscode (9)
- # xtdb (8)
Hello. I'm a long time clojure user but I'm trying to get into Kafka Streams and well jackdaw seemed like the logical choice 🙂 However I'm using confluent cloud, and our source topics are exported by debezium using avro. I've been trying to find examples of working Avro setups with CC but I'm able to consume messages properly, but even when supplying a schema, this is what I keep getting in my REPL
(def resolver (resolver/serde-resolver :schema-registry-url (:schema.registry.url kafka-config)))
(def project-users
{:topic-name "intra.public.projects_users"
:key-serde (resolver {:serde-keyword :jackdaw.serdes.avro.confluent/serde
:schema-filename "all-keys.avsc"
:key? true})
:value-serde (resolver {:serde-keyword :jackdaw.serdes.avro.confluent/serde
:schema-filename "project-users.avsc"
:key? false})})
(def consumer (-> (jc/consumer kafka-config) (jc/subscribe [project-users])))
(def records (jc/poll consumer 5000))
(close! consumer)
(take 2 records)
=>
({:serialized-key-size 8,
:key 86267,
:offset 4500,
:value #object[org.apache.avro.generic.GenericData$Record
0x9de40225
"{\"id\": 86267, \"project_id\": 59, \"user_id\": 11706, \"created_at\": 1433855502838, \"updated_at\": 1474973472856, \"occurrence\": 0, \"final_mark\": 0, \"retriable_at\": 1451576927116, \"marked_at\": 1474973472856}"],
:serialized-value-size 46,
:partition 0,
:headers #object[org.apache.kafka.common.header.internals.RecordHeaders
0xc867b584
"RecordHeaders(headers = [], isReadOnly = false)"],
:topic-name "intra.public.projects_users",
:timestamp-type :timestamp-type/create,
:timestamp 1637064977383}
{:serialized-key-size 9,
:key 1973166,
:offset 4501,
:value #object[org.apache.avro.generic.GenericData$Record
0xdc3ce8f3
"{\"id\": 1973166, \"project_id\": 1640, \"user_id\": 22713, \"created_at\": 1598670068302, \"updated_at\": 1598670068302, \"occurrence\": 0, \"final_mark\": 100, \"retriable_at\": 1598670068302, \"marked_at\": 1502279970000}"],
:serialized-value-size 49,
:partition 0,
:headers #object[org.apache.kafka.common.header.internals.RecordHeaders
0xdebb281e
"RecordHeaders(headers = [], isReadOnly = false)"],
:topic-name "intra.public.projects_users",
:timestamp-type :timestamp-type/create,
:timestamp 1637064977383})
I'm not getting any errors during schema loading, but it seems as if they are not used whatsoever. I can introduce errors into them and I still am able to poll
with no errors, and GenericData$Record
doesn't seem to be very idiomatic 😅 Any help for this newbie question is much appreciated