Fork me on GitHub
#jackdaw
<
2019-10-08
>
dbernal08:10:16

Is it possible to configure a stream with Avro? I'm having some difficulty getting it to work. I'm using this example as a starting point

(def streams-config
  {"application.id" "word-count"
   "bootstrap.servers" (or (System/getenv "BOOTSTRAP_SERVERS") "localhost:9092")
   "default.key.serde" "jackdaw.serdes.EdnSerde"
   "default.value.serde" "jackdaw.serdes.EdnSerde"
   "cache.max.bytes.buffering" "0"})
but haven't quite figured out what to specify for the default serdes

dharrigan08:10:18

I've taken a (slightly) different approach

dharrigan08:10:06

{:topic-name #profile {:default #env FOO_TOPIC
                                                                       :local "foo-bar"}
                                                 :replication-factor #long #profile {:default #or [#env REPLICATION_FACTOR 1]
                                                                                     :local 1}
                                                 :partition-count #long #profile {:default #or [#env PARTITION_COUNT 1]
                                                                                  :local 1}
                                                 :poll-interval-ms 1000
                                                 :seek #profile {:default #or [#env FOO_TOPIC_SEEK :latest]
                                                                 :local :latest}
                                                 :key-serde :string
                                                 :value-serde :json}]

dharrigan08:10:44

then, in my kafka client, I simply use the serde/serde-map to process it

dharrigan08:10:02

let the library figure out what to do 🙂

dbernal08:10:10

oh interesting, I'll give that a try

tzzh10:10:01

Hey, thanks for Jackdaw I think it’s great :thumbsup: I was wondering when the next release going to be ? currently using 0.6.8 but need to monkey patch it to use topics-ready? (which was fixed on master with https://github.com/FundingCircle/jackdaw/commit/9cd9524ae2eab26694eceafab882f4396f6e804e)

creese11:10:02

It may not be possible to specify Avro as the default value serde. We would need to expose a class and I don’t think Jackdaw has one for Avro.

dbernal15:10:32

Ok gotcha. So my use case is having a KTable that I can query with .get. I'm able to create the table with the Avro serde but haven't been able to get it working alongside the KafkaStream. Is the issue that there's no equivalent class to jackdaw.serdes.EdnSerde that uses Avro and the schema registry?

dbernal15:10:45

(defn avro-serde [registry]
  (avro/serde (merge avro/+base-schema-type-registry+
                     avro/+UUID-type-registry+)
              {:avro.schema-registry/url (str "http://" registry)}
              {:key? true}))

(defn store [] (Stores/inMemoryKeyValueStore "seller-store"))

(defn ktable [sb store] (js/ktable sb [{:topic-name "test.c.dbo.Seller" :key-serde (avro-serde "localhost:8081") :value-serde (avro-serde "localhost:8081")}] store))

(defn foo
  []
  (let [builder (js/streams-builder)
        store (Stores/inMemoryKeyValueStore "seller-store")
        table (js/ktable builder {:topic-name "test.c.dbo.Seller"}
                         store)
        kafka-stream (js/kafka-streams builder
                                       {"bootstrap.servers" "localhost:9092"
                                        "application.id" "foo-test-now"
                                        "schema.registry.url" "localhost:8081"})]
    (js/start kafka-stream)
    {:stream kafka-stream
     :table table}))

dbernal15:10:55

code for context

creese11:10:45

BTW, I’ve been refactoring some of the examples. In its current form, the Serde example may not work.

creese11:10:33

@thomas.ormezzano I can do a release later today.

tzzh11:10:00

oh thanks that’d be great 😻

callumcodes14:10:00

What is the standard way of handling errors when processing a topology with jackdaw? There doesn’t seem to be an error handling method

creese17:10:57

I usually set "default.deserialization.exception.handler" to "org.apache.kafka.streams.errors.LogAndFailExceptionHandler" in the streams config.

creese17:10:38

If there is an exception the app can’t handle, it logs the error and crashes.

creese17:10:57

I believe you can also log and skip but I haven’t tried that.

Travis Brown15:10:12

@creese @ticean thanks for the reading tips! I had consumed most of what I could find from Greg Young, including his half-written book. What I haven't read yet is anything that gets into the weeds enough to prepare me to write anything similar to what you've presented from FundingCircle. If you've read Enterprise Integration Patterns (just reading it now), do these patterns fit into your designs?

creese17:10:23

Sorry, I haven’t read that.

ticean17:10:59

Nor have I. Added to my reading list. Thanks.

andrea.crotti16:10:55

which of course in Clojure looks better than that

andrea.crotti16:10:27

but it's probably not really documented anywhere since it's not really part of the library I guess

creese17:10:58

Sorry, my answer was only for deserialization exceptions. For other exceptions, @andrea.crotti is correct.

creese17:10:38

For the accounting system, we do something like, (.setUncaughtExceptionHandler streams-app the-handler)

creese17:10:48

We also wait 60 seconds before we shutdown the app.