Fork me on GitHub
#jackdaw
<
2020-07-21
>
abdullahibra12:07:03

is there a way to get kafka message timestamp for example in map or for-each ?

abdullahibra12:07:17

i see map and for-each only take 2 args which are key and value

abdullahibra12:07:24

but what about the additional keys

Daniel Stephens12:07:22

I believe you have to drop into the slightly lower level functions using process or transform to get the other info, last I checked that's what you needed to do in pure Kafka Streams as well

Daniel Stephens12:07:12

Then you get given the ProcessorContext object which has the timestamp among other things

abdullahibra12:07:02

that's really good advice, thank you 🙂

👍 3
abdullahibra14:07:38

(reify Transformer
   (init [_ _])
   (close [_])
   (transform [ctx k v]
     (key-value [k [v (.timestamp ctx)]])))

abdullahibra14:07:13

i have got an error: java.lang.IllegalArgumentException: No matching field found: timestamp for class

abdullahibra14:07:23

can you help here?

Daniel Stephens14:07:27

Ahh sure, it's a bit confusing because the java expect some mutable state. The important bit that's wrong for your code is that the first argument passed to transform function you've reified is this in java terms, so the field .timestamp doesn't exist because you are calling it on your reified object. Processor context actually get's passed in as the second arg to init and then gets mutated as kakfa reads. I have this in my code:

(defn transformer
  [f]
  (let [ctx (atom nil)]
    (reify Transformer
      (init [_ context]
        (reset! ctx context))
      (close [_])
      (transform [_ k v]
        (some-> (f @ctx k v)
                jl/key-value)))))
and that way I can provide a clojure function which does accept the useful (imo) 3 args (fn [ctx k v] ...)

abdullahibra14:07:32

that's good, but (f @ctx k v) that function will return what i need [k [v timestamp]] right ?

abdullahibra14:07:55

@dstephens is there a better way you prefer to do that ?

abdullahibra14:07:34

i meant to return timestamp to make it available for next processing by map or for-each

Daniel Stephens14:07:58

yeah, if you give back [k [v timestamp]] then your value is now a vector of your old value and the kafka timestamp (whichever one you configured), the rest is dependent on your design, personally if the time is required by my business logic I'd probably put the timestamp on v manually when it gets made but I have no idea if that is 'good practice'

Daniel Stephens14:07:35

Personally I'd rather not rely on location in a vector either, so I'd probably give back [k {:value v :metadata {:timestamp timestamp}}] at least, so that you can extend it later more easily

Daniel Stephens14:07:43

and then in your map you can have your function (fn [[k v]] (get-in v [:metadata :timestamp])) or whatever you want

Daniel Stephens14:07:11

Depends on what serialisation you use as well, lots of variables I'm afraid!

abdullahibra14:07:20

thank you so much

Daniel Stephens14:07:44

Glad I could help 😊 I should say as a disclaimer I'm not one of the maintainers of jackdaw or anything so while I think my technical advice is correct you should listen to my design advice with reasonable doubt and read lots of confluent docs on that stuff!