core-async

Karl Xaver 2025-04-28T18:37:01.551789Z

core.async.flow: How can I attach transducers to in/out channels? The docstring of create-flow says there's

:chan-opts - a map of in-or-out-id->{:keys [buf-or-n xform]}, where buf-or-n
               and xform have their meanings per core.async/chan
               the default is {:buf-or-n 10}
Adapting the https://clojurians.slack.com/archives/C05423W6H/p1744134503513079?thread_ts=1742495587.307929&cid=C05423W6HRHs like this...
(def gdef
  {:procs     {:looper {:proc (-> #(do (prn %) (when (< % 7) (inc %))) flow/lift1->step flow/process)}}
   :chan-opts {[:looper :out] {:xform (map inc)}}
   :conns     [[[:looper :out] [:looper :in]]]})

(def g (f/create-flow gdef))
(f/start g)
(f/resume g)
(f/inject g [:looper :in] [1])
... it still counts from 1 to 7, no inc applied. Grepping the impl I only found :chan-opts mentioned in the docstring, so I guess it's not implemented? If so, whats recommended to achieve the same goal? In particular, I'd like to partition-by values between processes.

Alex Miller (Clojure team) 2025-04-28T18:44:54.107809Z

there is code there in prep-proc with the intent to support channel xforms, but I will look further

Alex Miller (Clojure team) 2025-04-28T18:52:00.578909Z

oh, I think you're putting the chan-opts in the wrong place - they go inside the proc map

Alex Miller (Clojure team) 2025-04-28T18:53:09.127349Z

(def gdef
  {:procs     {:proc (-> #(do (prn %) (when (< % 7) (inc %))) flow/lift1->step flow/process)
               :chan-opts {:out {:xform (map inc)}}}}
   :conns     [[[:looper :out] [:looper :in]]]})

Alex Miller (Clojure team) 2025-04-28T18:53:38.668959Z

(I edited that in place, sorry if the braces don't all balance there, but that's the idea)

Alex Miller (Clojure team) 2025-04-28T18:58:01.413239Z

I think yours is also missing the proc name map level so this is still not right

Alex Miller (Clojure team) 2025-04-28T18:58:42.973269Z

(def gdef
  {:procs    {:looper
                {:proc (-> #(do (prn %) (when (< % 7) (inc %))) flow/lift1->step flow/process)
                 :chan-opts {:out {:xform (map inc)}}}}}
   :conns     [[[:looper :out] [:looper :in]]]})
?

Karl Xaver 2025-04-28T19:03:52.276129Z

Oh, yeah, totally misread the docstring. And the missing name was a copy/paste fail. Now I have:

(do (def gdef
        {:procs {:looper
                 {:proc      (-> #(do (prn %) (when (< % 7) (inc %))) f/lift1->step f/process)
                  :chan-opts {:out {:xform (map inc)}}}}
         :conns [[[:looper :out] [:looper :in]]]})
      
      (def g (f/create-flow gdef))
      (f/start g)
      (f/resume g)
      (f/inject g [:looper :in] [1])
      (Thread/sleep 100)
      (f/stop g))
But still does not seem to work. Getting closer though and will look further into this later :) Thanks for getting me on the right track!

Karl Xaver 2025-04-28T19:16:00.634449Z

Hah! This prints 2,4,6,8. Works only on :in not :out (at least in this scenario, need to figure out why), and :buf-or-n is mandatory with :xform

(do (def gdef
        {:procs {:looper
                 {:proc      (-> #(do (prn %) (when (< % 7) (inc %))) f/lift1->step f/process)
                  :chan-opts {:in {:buf-or-n 10 :xform (map inc)}}}}
         :conns [[[:looper :out] [:looper :in]]]})
      
      (def g (f/create-flow gdef))
      (f/start g)
      (f/resume g)
      (f/inject g [:looper :in] [1])
      (Thread/sleep 100)
      (f/stop g))

Karl Xaver 2025-04-28T19:33:45.493419Z

Interesting: :chan-opts {:in {:xform (map inc)}} throws "Assert failed: buffer must be supplied when transducer is", as it should :chan-opts {:out {:xform (map inc)}} does not!

Alex Miller (Clojure team) 2025-04-28T19:38:44.539709Z

well it's trying to match things up, I guess it's driving from the ins more than the outs. there are still some open questions in how the connecting channels are constructed so this is maybe a good question

stathissideris 2025-04-28T20:00:38.013769Z

Just tried https://github.com/clojure/core.async.flow-monitor/ it was so exciting to pause a downstream process and see backpressure in action!

Alex Miller (Clojure team) 2025-04-28T20:08:33.323999Z

release the dam!

😄 1
Samuel Ludwig 2025-04-28T20:09:44.712369Z

between this and #flow-storm its always great to see clojure at the peak of introspection/debug tooling

stathissideris 2025-04-28T20:09:57.248659Z

I’ve started a repo with some c.a.flow examples (for now it’s the original example by Rich adapted to the latest version), I’ll try to add more: https://github.com/pixelated-noise/async-flow-example/tree/master

🙌 3
stathissideris 2025-04-28T20:18:57.048289Z

is self-send an anti-pattern in any way?

Alex Miller (Clojure team) 2025-04-28T21:21:08.728289Z

should work (as long as you are not doing pathological things)

stathissideris 2025-04-28T21:23:41.015249Z

I’m using it to handle pagination, the process notifies itself about fetching the next page

Alex Miller (Clojure team) 2025-04-28T21:25:43.526879Z

seems ok to me

🙏 1
jmv 2025-04-28T21:11:38.844919Z

i have a project at work with different data processing jobs that are all deployed together. would there be anything wrong with putting multiple disjoint graphs into one flow? it looks ok at first glance but just want to make sure i didn't miss anything i would put them into different flows however i would like to use flow monitor. binding a port for each flow isn't the easiest path with the infra i deploy to.

Alex Miller (Clojure team) 2025-04-28T21:21:37.813029Z

nothing wrong I don't think, as long as the overall lifecycle is ok to be combined

jmv 2025-04-28T21:40:10.875789Z

excellent! i think some namespaced keys and functions to merge the flows will make this pretty easy.

Alex Miller (Clojure team) 2025-04-28T21:33:31.524769Z

14
❤️ 7
💜 1
chucklehead 2025-04-29T23:40:04.306709Z

Rendering of input buffers appears to be incorrect (or at least confusing) when doing many-to-one. A flow definition like:

{:procs {:scheduler-1 {:args {:wait 1000}
                       :proc (flow/process #'scheduler)}
         :scheduler-2 {:args {:wait 5000}
                       :proc (flow/process #'scheduler)}
         :notifier {:proc (flow/process #'notifier)
                    :chan-opts {:in {:buf-or-n (a/sliding-buffer 3)}}}}
 :conns [[[:scheduler-1 :out] [:notifier :in]]
         [[:scheduler-2 :out] [:notifier :in]]]}
renders the common notifier buffer for both scheduler instances

Alex Miller (Clojure team) 2025-04-29T23:49:28.311279Z

There are two channels there so I think that’s correct.

Alex Miller (Clojure team) 2025-04-29T23:50:34.723949Z

There is a single port on notifier but different channels between the schedules and the notifier

chucklehead 2025-04-30T00:28:07.939919Z

If one scheduler is resumed while the other scheduler and notifier remain paused, both buffers appear to fill simultaneously even though nothing is emitted on scheduler-2's out channel.

chucklehead 2025-04-30T00:38:15.760169Z

I'm a bit ignorant when it comes to core.async and obviously new to core.async.flow. Based on the config I expected a single 3-slot buffer on notifier's input shared by all sources, which seems to match the observed behavior. But based on the monitor rendering I would naively expect the buffers on each channel to be distinct and unpausing scheduler-1 to have no effect on scheduler-2's buffer.

Alex Miller (Clojure team) 2025-04-30T15:59:33.791579Z

@jarrodctaylor will take a look today and get back to you

Jarrod Taylor (Clojure team) 2025-04-30T19:09:17.862339Z

This is an issue with the visualizer. The visualizer is drawing multiple boxes that are really the same channel. I will work on getting it to render more accurately. Thanks for testing it out and providing feedback @chuck.cassel

Alex Miller (Clojure team) 2025-04-30T19:23:06.159849Z

yeah, I was wrong above - sorry about that! (I'm still learning flow too :)

chucklehead 2025-04-30T20:03:59.361449Z

Thank you both. Looking forward to playing with it more myself and excited to see what comes out of the community.