This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2020-07-21
Channels
- # aws (14)
- # babashka (35)
- # beginners (163)
- # boot (2)
- # calva (5)
- # cider (30)
- # clojure (143)
- # clojure-colombia (1)
- # clojure-europe (5)
- # clojure-nl (11)
- # clojure-spec (1)
- # clojure-uk (16)
- # clojurescript (71)
- # community-development (2)
- # conjure (1)
- # cursive (6)
- # datomic (30)
- # duct (4)
- # figwheel-main (11)
- # fulcro (28)
- # graalvm (3)
- # graphql (23)
- # hoplon (36)
- # jackdaw (24)
- # kaocha (16)
- # lambdaisland (1)
- # leiningen (4)
- # luminus (3)
- # meander (4)
- # observability (1)
- # off-topic (10)
- # pathom (5)
- # re-frame (27)
- # reitit (7)
- # remote-jobs (1)
- # sci (17)
- # shadow-cljs (22)
- # spacemacs (14)
- # sql (61)
- # testing (3)
- # tools-deps (27)
- # vim (2)
- # xtdb (18)
- # yada (2)
Hi everyone
is there a way to get kafka message timestamp for example in map or for-each ?
i see map and for-each only take 2 args which are key and value
but what about the additional keys
I believe you have to drop into the slightly lower level functions using process
or transform
to get the other info, last I checked that's what you needed to do in pure Kafka Streams as well
Then you get given the ProcessorContext
object which has the timestamp among other things
(reify Transformer
(init [_ _])
(close [_])
(transform [ctx k v]
(key-value [k [v (.timestamp ctx)]])))
i have got an error: java.lang.IllegalArgumentException: No matching field found: timestamp for class
can you help here?
Ahh sure, it's a bit confusing because the java expect some mutable state. The important bit that's wrong for your code is that the first argument passed to transform
function you've reified is this
in java terms, so the field .timestamp
doesn't exist because you are calling it on your reified object.
Processor context actually get's passed in as the second arg to init
and then gets mutated as kakfa reads. I have this in my code:
(defn transformer
[f]
(let [ctx (atom nil)]
(reify Transformer
(init [_ context]
(reset! ctx context))
(close [_])
(transform [_ k v]
(some-> (f @ctx k v)
jl/key-value)))))
and that way I can provide a clojure function which does accept the useful (imo) 3 args (fn [ctx k v] ...)
that's good, but (f @ctx k v) that function will return what i need [k [v timestamp]] right ?
@dstephens is there a better way you prefer to do that ?
i meant to return timestamp to make it available for next processing by map or for-each
yeah, if you give back [k [v timestamp]]
then your value is now a vector of your old value and the kafka timestamp (whichever one you configured), the rest is dependent on your design, personally if the time is required by my business logic I'd probably put the timestamp on v
manually when it gets made but I have no idea if that is 'good practice'
Personally I'd rather not rely on location in a vector either, so I'd probably give back [k {:value v :metadata {:timestamp timestamp}}]
at least, so that you can extend it later more easily
and then in your map
you can have your function (fn [[k v]] (get-in v [:metadata :timestamp]))
or whatever you want
Depends on what serialisation you use as well, lots of variables I'm afraid!
great advice
thank you so much
Glad I could help 😊 I should say as a disclaimer I'm not one of the maintainers of jackdaw or anything so while I think my technical advice is correct you should listen to my design advice with reasonable doubt and read lots of confluent docs on that stuff!