This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2019-10-08
Channels
- # announcements (3)
- # babashka (3)
- # beginners (25)
- # calva (12)
- # cider (58)
- # clara (11)
- # clj-kondo (19)
- # cljsrn (2)
- # clojure (84)
- # clojure-austin (1)
- # clojure-europe (5)
- # clojure-nl (4)
- # clojure-spec (23)
- # clojure-uk (53)
- # clojuredesign-podcast (5)
- # clojurescript (24)
- # core-async (57)
- # cursive (16)
- # datomic (39)
- # emacs (1)
- # fulcro (40)
- # funcool (2)
- # graphql (17)
- # jackdaw (31)
- # jobs (2)
- # joker (3)
- # malli (7)
- # off-topic (12)
- # re-frame (9)
- # reagent (2)
- # reitit (1)
- # ring (4)
- # shadow-cljs (170)
- # sql (36)
- # tools-deps (5)
- # xtdb (20)
Is it possible to configure a stream with Avro? I'm having some difficulty getting it to work. I'm using this example as a starting point
(def streams-config
{"application.id" "word-count"
"bootstrap.servers" (or (System/getenv "BOOTSTRAP_SERVERS") "localhost:9092")
"default.key.serde" "jackdaw.serdes.EdnSerde"
"default.value.serde" "jackdaw.serdes.EdnSerde"
"cache.max.bytes.buffering" "0"})
but haven't quite figured out what to specify for the default serdes{:topic-name #profile {:default #env FOO_TOPIC
:local "foo-bar"}
:replication-factor #long #profile {:default #or [#env REPLICATION_FACTOR 1]
:local 1}
:partition-count #long #profile {:default #or [#env PARTITION_COUNT 1]
:local 1}
:poll-interval-ms 1000
:seek #profile {:default #or [#env FOO_TOPIC_SEEK :latest]
:local :latest}
:key-serde :string
:value-serde :json}]
Hey, thanks for Jackdaw I think it’s great :thumbsup: I was wondering when the next release going to be ? currently using 0.6.8
but need to monkey patch it to use topics-ready?
(which was fixed on master with https://github.com/FundingCircle/jackdaw/commit/9cd9524ae2eab26694eceafab882f4396f6e804e)
@dbernal For Avro, if you define your topic metadata like this: https://github.com/FundingCircle/jackdaw/blob/master/examples/serdes/src/serdes.clj#L132-L134
You can use the serde resolver: https://github.com/FundingCircle/jackdaw/blob/master/examples/serdes/src/serdes.clj#L147-L154
It may not be possible to specify Avro as the default value serde. We would need to expose a class and I don’t think Jackdaw has one for Avro.
Ok gotcha. So my use case is having a KTable that I can query with .get. I'm able to create the table with the Avro serde but haven't been able to get it working alongside the KafkaStream. Is the issue that there's no equivalent class to jackdaw.serdes.EdnSerde that uses Avro and the schema registry?
(defn avro-serde [registry]
(avro/serde (merge avro/+base-schema-type-registry+
avro/+UUID-type-registry+)
{:avro.schema-registry/url (str "http://" registry)}
{:key? true}))
(defn store [] (Stores/inMemoryKeyValueStore "seller-store"))
(defn ktable [sb store] (js/ktable sb [{:topic-name "test.c.dbo.Seller" :key-serde (avro-serde "localhost:8081") :value-serde (avro-serde "localhost:8081")}] store))
(defn foo
[]
(let [builder (js/streams-builder)
store (Stores/inMemoryKeyValueStore "seller-store")
table (js/ktable builder {:topic-name "test.c.dbo.Seller"}
store)
kafka-stream (js/kafka-streams builder
{"bootstrap.servers" "localhost:9092"
"application.id" "foo-test-now"
"schema.registry.url" "localhost:8081"})]
(js/start kafka-stream)
{:stream kafka-stream
:table table}))
BTW, I’ve been refactoring some of the examples. In its current form, the Serde example may not work.
@thomas.ormezzano I can do a release later today.
What is the standard way of handling errors when processing a topology with jackdaw? There doesn’t seem to be an error handling method
I usually set "default.deserialization.exception.handler"
to "org.apache.kafka.streams.errors.LogAndFailExceptionHandler"
in the streams config.
@creese @ticean thanks for the reading tips! I had consumed most of what I could find from Greg Young, including his half-written book. What I haven't read yet is anything that gets into the weeds enough to prepare me to write anything similar to what you've presented from FundingCircle. If you've read Enterprise Integration Patterns (just reading it now), do these patterns fit into your designs?
@callum838 we just how shutdown hooks https://kafka.apache.org/10/documentation/streams/developer-guide/write-streams
which of course in Clojure looks better than that
but it's probably not really documented anywhere since it's not really part of the library I guess
Sorry, my answer was only for deserialization exceptions. For other exceptions, @andrea.crotti is correct.