Fork me on GitHub

I think I understand what you mean. You'd like to do something like...

(defn foo
	[builder dispatch-fn]
  (-> (k/stream builder foo)
      (k/map transform-foo)
      (k/filter foo-valid?)
      (k/process! (partial dispatch-processor dispatch-fn))))
...but in order for your dispatch processor to work, you'd need to (.addSink sink-id processor-id key-serializer value-serializer processor-id) for each sink you want to write to in your dispatch-processor, and the processor-id assigned by the DSL is pseudo-random, making it difficult to add a sink with the correct "parent processor". I think the solution is to use .addProcessor rather than k/process!. This allows you to assign a name to the processor.