Fork me on GitHub
#jackdaw
<
2019-07-23
>
xi09:07:52

Thanks @cddr! I'm blind, I must have looked at every other file but that one >.< join doesn't take a topic-config? Although, think I'm being dumb (as well as blind! In my defence, I'm sick) - I should use to/through.

cddr10:07:45

> join doesn't take a topic-config? Ah I guess there's a few other variable names which imply a topic-config

cddr10:07:03

What are you trying to do?

xi10:07:47

I'm trying to rekey two topics, join them then run a processor

xi10:07:58

So far I have something like (left-join (-> (kstream ...) (select-key ...)) (-> (kstream ...) (group-by ...) (aggregate ...))

xi10:07:52

which I plan to pass into process!, but I need to add a state store and I can't see how to in Jackdaw

xi10:07:29

My team picked the processor API initially for simplicity then got rushed along adding features and we've only just remembered to revisit the KStreams API

xi10:07:48

So I need to make them work together as we transition

xi11:07:08

I can have a go at the state store with kstreamsbuilder* but that's not ideal

cddr11:07:15

So that all sounds totally reasonable and I think it is supported by jackdaw. The process! method takes a function which takes as the first parameter a ProcessorContext. This is where you can get access to the state-store and perform side-effects. If you plan to use a state-store, I think you need to add them to the topology at build time using methods on the StreamsBuilder and as you've identified you can get that from the kstreamsbuilder* method.

xi11:07:14

Sounds like I'm on the right track then - thanks for the help

cddr11:07:43

I haven't thought too much about how to make it easier to get access to state stores. We do it a bit ourselves and just drop down to the interop but I'm sure we could improve on the "ergonomics" in light of real-world examples. Let me know if you have any thoughts on how you'd like it to work.

xi13:07:25

I got my stream to build. I used this:

(defn kvsb                               
  [store]                                
  (Stores/keyValueStoreBuilder           
   (Stores/persistentKeyValueStore store)
   store-serde store-serde))             

(.addStateStore (j/streams-builder* builder) (k/kvsb "my-store"))

xi13:07:58

It'd be nice to be able to do more like: (add-state-store builder (persistent-key-value-store "my-store" serde serde)) (especially if that returned builder, to maintain threadability)

xi13:07:50

Or maybe (add-state-store builder (with (persistent-key-value-store "my-store") serde serde))