Fork me on GitHub
#jackdaw
<
2020-10-26
>
markbastian16:10:57

What is the reason for the topic being passed in to the tail position in https://github.com/FundingCircle/jackdaw/blob/master/test/jackdaw/streams_test.clj#L891-L894? I am looking at the arities https://cljdoc.org/d/fundingcircle/jackdaw/0.6.9/api/jackdaw.streams#aggregate for aggregate and I really only understand the first one (aggregate kgrouped initializer-fn adder-fn). Can anyone shed some light for me on what the others do? I am trying to get something like this to work. When I take out the topic in the last position I get garbage for a.

(-> (js/kstream builder topic1)
          (js/group-by (fn [[key {:keys [group] :as record}]] group))
          (js/aggregate
            (constantly (do
                          (pp/pprint "I AM AT THE START!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
                          {:size 0}))
            (fn [a [key {:keys [size] :as b}]]
              (pp/pprint {:a a :b b}) (update a :size + size)))
          js/to-kstream
          (js/peek (fn [v] (pp/pprint ["peek" v])))
          (js/to topic2))

dakra09:10:21

You probably have to specify value-serde so it's parsed correctly. Try adding a topic-config as last argument. Something like {:topic-name "aggr-table" :value-serde (jackdaw.serdes/edn-serde)})

markbastian16:10:53

I figured it might be serde related. I'll investigate further. Thanks!

markbastian16:10:36

Also, I read more about Kafka streams in general and I think I now understand the remaining params. They are relevant to if you are using a KStream vs. a KTable.