Fork me on GitHub
#jackdaw
<
2019-10-01
>
lxsli13:10:24

Hello again... I'm trying to mix the streams + processor APIs. Specifically I want to add a Processor to the end of my topology which sends messages to multiple topics. The issue I'm hitting is that Streams (/Jackdaw) gives my processor a random name, so I have no easy way to call Topology.addSink(String processor-name, ...).

lxsli13:10:09

I'd prefer to avoid introspecting the topology to dig out that name, I can't see a way to do that robustly

cddr13:10:31

Are you using the Processor because you need some other feature not provided by the DSL? I think you can achieve what you've described with something like this...

(fn [builder]
  (let [foo (k/stream builder foo-topic)]
     (-> foo (k/map identity) (k/to foo-1))
     (-> foo (k/map identity) (k/to foo-2))
     (-> foo (k/map identity) (k/to foo-3))
     builder))

lxsli13:10:54

This is related to a wider concern that if 1 message (a command) is broken up into many events and those events are committed separately, that a command may get partially executed in case of downtime. Otherwise I could branch.

cddr13:10:07

Ah ok. So you want to have control over when the commit happens. :thinking_face:

lxsli13:10:12

Yep exactly

lxsli13:10:44

(defn dispatch-processor
  "Why not `flat-map f` beforehand instead of passing it in here? That would
  split one message into many and therefore commit many times instead of once."
  [f ctx k v]
  (doseq [[topic msg-key msg] (f [k v])]
    (.forward ctx msg-key msg (To/child topic)))
  (.commit ctx))

lxsli13:10:56

that's what I'm trying to attach

lxsli13:10:18

Open to better ways to achieve this