core.async.flow: How can I attach transducers to in/out channels?
The docstring of create-flow says there's
:chan-opts - a map of in-or-out-id->{:keys [buf-or-n xform]}, where buf-or-n
and xform have their meanings per core.async/chan
the default is {:buf-or-n 10}
Adapting the https://clojurians.slack.com/archives/C05423W6H/p1744134503513079?thread_ts=1742495587.307929&cid=C05423W6HRHs like this...
(def gdef
{:procs {:looper {:proc (-> #(do (prn %) (when (< % 7) (inc %))) flow/lift1->step flow/process)}}
:chan-opts {[:looper :out] {:xform (map inc)}}
:conns [[[:looper :out] [:looper :in]]]})
(def g (f/create-flow gdef))
(f/start g)
(f/resume g)
(f/inject g [:looper :in] [1])
... it still counts from 1 to 7, no inc applied.
:chan-opts mentioned in the docstring, so I guess it's not implemented?partition-by values between processes.there is code there in prep-proc with the intent to support channel xforms, but I will look further
oh, I think you're putting the chan-opts in the wrong place - they go inside the proc map
(def gdef
{:procs {:proc (-> #(do (prn %) (when (< % 7) (inc %))) flow/lift1->step flow/process)
:chan-opts {:out {:xform (map inc)}}}}
:conns [[[:looper :out] [:looper :in]]]})(I edited that in place, sorry if the braces don't all balance there, but that's the idea)
I think yours is also missing the proc name map level so this is still not right
(def gdef
{:procs {:looper
{:proc (-> #(do (prn %) (when (< % 7) (inc %))) flow/lift1->step flow/process)
:chan-opts {:out {:xform (map inc)}}}}}
:conns [[[:looper :out] [:looper :in]]]})
?Oh, yeah, totally misread the docstring. And the missing name was a copy/paste fail. Now I have:
(do (def gdef
{:procs {:looper
{:proc (-> #(do (prn %) (when (< % 7) (inc %))) f/lift1->step f/process)
:chan-opts {:out {:xform (map inc)}}}}
:conns [[[:looper :out] [:looper :in]]]})
(def g (f/create-flow gdef))
(f/start g)
(f/resume g)
(f/inject g [:looper :in] [1])
(Thread/sleep 100)
(f/stop g))
But still does not seem to work. Getting closer though and will look further into this later :)
Thanks for getting me on the right track!Hah! This prints 2,4,6,8.
Works only on :in not :out (at least in this scenario, need to figure out why), and :buf-or-n is mandatory with :xform
(do (def gdef
{:procs {:looper
{:proc (-> #(do (prn %) (when (< % 7) (inc %))) f/lift1->step f/process)
:chan-opts {:in {:buf-or-n 10 :xform (map inc)}}}}
:conns [[[:looper :out] [:looper :in]]]})
(def g (f/create-flow gdef))
(f/start g)
(f/resume g)
(f/inject g [:looper :in] [1])
(Thread/sleep 100)
(f/stop g))Interesting:
:chan-opts {:in {:xform (map inc)}} throws "Assert failed: buffer must be supplied when transducer is", as it should
:chan-opts {:out {:xform (map inc)}} does not!
well it's trying to match things up, I guess it's driving from the ins more than the outs. there are still some open questions in how the connecting channels are constructed so this is maybe a good question
Just tried https://github.com/clojure/core.async.flow-monitor/ it was so exciting to pause a downstream process and see backpressure in action!
release the dam!
between this and #flow-storm its always great to see clojure at the peak of introspection/debug tooling
I’ve started a repo with some c.a.flow examples (for now it’s the original example by Rich adapted to the latest version), I’ll try to add more: https://github.com/pixelated-noise/async-flow-example/tree/master
is self-send an anti-pattern in any way?
should work (as long as you are not doing pathological things)
I’m using it to handle pagination, the process notifies itself about fetching the next page
seems ok to me
i have a project at work with different data processing jobs that are all deployed together. would there be anything wrong with putting multiple disjoint graphs into one flow? it looks ok at first glance but just want to make sure i didn't miss anything i would put them into different flows however i would like to use flow monitor. binding a port for each flow isn't the easiest path with the infra i deploy to.
nothing wrong I don't think, as long as the overall lifecycle is ok to be combined
excellent! i think some namespaced keys and functions to merge the flows will make this pretty easy.
Rendering of input buffers appears to be incorrect (or at least confusing) when doing many-to-one. A flow definition like:
{:procs {:scheduler-1 {:args {:wait 1000}
:proc (flow/process #'scheduler)}
:scheduler-2 {:args {:wait 5000}
:proc (flow/process #'scheduler)}
:notifier {:proc (flow/process #'notifier)
:chan-opts {:in {:buf-or-n (a/sliding-buffer 3)}}}}
:conns [[[:scheduler-1 :out] [:notifier :in]]
[[:scheduler-2 :out] [:notifier :in]]]}
renders the common notifier buffer for both scheduler instancesThere are two channels there so I think that’s correct.
There is a single port on notifier but different channels between the schedules and the notifier
If one scheduler is resumed while the other scheduler and notifier remain paused, both buffers appear to fill simultaneously even though nothing is emitted on scheduler-2's out channel.
I'm a bit ignorant when it comes to core.async and obviously new to core.async.flow. Based on the config I expected a single 3-slot buffer on notifier's input shared by all sources, which seems to match the observed behavior. But based on the monitor rendering I would naively expect the buffers on each channel to be distinct and unpausing scheduler-1 to have no effect on scheduler-2's buffer.
@jarrodctaylor will take a look today and get back to you
This is an issue with the visualizer. The visualizer is drawing multiple boxes that are really the same channel. I will work on getting it to render more accurately. Thanks for testing it out and providing feedback @chuck.cassel
yeah, I was wrong above - sorry about that! (I'm still learning flow too :)
Thank you both. Looking forward to playing with it more myself and excited to see what comes out of the community.