Fork me on GitHub

Hello again... I'm trying to mix the streams + processor APIs. Specifically I want to add a Processor to the end of my topology which sends messages to multiple topics. The issue I'm hitting is that Streams (/Jackdaw) gives my processor a random name, so I have no easy way to call Topology.addSink(String processor-name, ...).


I'd prefer to avoid introspecting the topology to dig out that name, I can't see a way to do that robustly


Are you using the Processor because you need some other feature not provided by the DSL? I think you can achieve what you've described with something like this...

(fn [builder]
  (let [foo (k/stream builder foo-topic)]
     (-> foo (k/map identity) (k/to foo-1))
     (-> foo (k/map identity) (k/to foo-2))
     (-> foo (k/map identity) (k/to foo-3))


This is related to a wider concern that if 1 message (a command) is broken up into many events and those events are committed separately, that a command may get partially executed in case of downtime. Otherwise I could branch.


Ah ok. So you want to have control over when the commit happens. :thinking_face:


Yep exactly


(defn dispatch-processor
  "Why not `flat-map f` beforehand instead of passing it in here? That would
  split one message into many and therefore commit many times instead of once."
  [f ctx k v]
  (doseq [[topic msg-key msg] (f [k v])]
    (.forward ctx msg-key msg (To/child topic)))
  (.commit ctx))


that's what I'm trying to attach


Open to better ways to achieve this