Fork me on GitHub
#jackdaw
<
2021-12-04
>
Nicolas Estrada21:12:04

Hello. I'm a long time clojure user but I'm trying to get into Kafka Streams and well jackdaw seemed like the logical choice 🙂 However I'm using confluent cloud, and our source topics are exported by debezium using avro. I've been trying to find examples of working Avro setups with CC but I'm able to consume messages properly, but even when supplying a schema, this is what I keep getting in my REPL

(def resolver (resolver/serde-resolver :schema-registry-url (:schema.registry.url kafka-config)))
(def project-users
  {:topic-name  "intra.public.projects_users"
   :key-serde   (resolver {:serde-keyword   :jackdaw.serdes.avro.confluent/serde
                           :schema-filename "all-keys.avsc"
                           :key?            true})
   :value-serde (resolver {:serde-keyword   :jackdaw.serdes.avro.confluent/serde
                           :schema-filename "project-users.avsc"
                           :key?            false})})
(def consumer (-> (jc/consumer kafka-config) (jc/subscribe [project-users])))
(def records (jc/poll consumer 5000))
(close! consumer)
(take 2 records)
=>
({:serialized-key-size 8,
  :key 86267,
  :offset 4500,
  :value #object[org.apache.avro.generic.GenericData$Record
                 0x9de40225
                 "{\"id\": 86267, \"project_id\": 59, \"user_id\": 11706, \"created_at\": 1433855502838, \"updated_at\": 1474973472856, \"occurrence\": 0, \"final_mark\": 0, \"retriable_at\": 1451576927116, \"marked_at\": 1474973472856}"],
  :serialized-value-size 46,
  :partition 0,
  :headers #object[org.apache.kafka.common.header.internals.RecordHeaders
                   0xc867b584
                   "RecordHeaders(headers = [], isReadOnly = false)"],
  :topic-name "intra.public.projects_users",
  :timestamp-type :timestamp-type/create,
  :timestamp 1637064977383}
 {:serialized-key-size 9,
  :key 1973166,
  :offset 4501,
  :value #object[org.apache.avro.generic.GenericData$Record
                 0xdc3ce8f3
                 "{\"id\": 1973166, \"project_id\": 1640, \"user_id\": 22713, \"created_at\": 1598670068302, \"updated_at\": 1598670068302, \"occurrence\": 0, \"final_mark\": 100, \"retriable_at\": 1598670068302, \"marked_at\": 1502279970000}"],
  :serialized-value-size 49,
  :partition 0,
  :headers #object[org.apache.kafka.common.header.internals.RecordHeaders
                   0xdebb281e
                   "RecordHeaders(headers = [], isReadOnly = false)"],
  :topic-name "intra.public.projects_users",
  :timestamp-type :timestamp-type/create,
  :timestamp 1637064977383})

Nicolas Estrada21:12:46

I'm not getting any errors during schema loading, but it seems as if they are not used whatsoever. I can introduce errors into them and I still am able to poll with no errors, and GenericData$Record doesn't seem to be very idiomatic 😅 Any help for this newbie question is much appreciated