core-async

Santiago Cabrera 2026-02-01T13:29:17.328519Z

Hi, am I dong something wrong with chan-opts? I don't see the issue in the code.

(def f (flow/create-flow
         {:procs {:hello {:proc      (flow/process (flow/lift1->step identity))
                          :chan-opts {:out {:xform (map inc)}}} ; <--- Increment each value produced
                  :print {:proc (flow/process (flow/lift1->step #(do (prn %) %)))}}
          :conns [[[:hello :out] [:print :in]]]}))

(flow/start f)
(flow/resume f)
; This prints 1 2 3, I'd expect to be 2 3 4 due to calling (map inc)
@(flow/inject f [:hello :out] [1 2 3])                      

Santiago Cabrera 2026-02-01T17:28:58.243289Z

Looks like :chan-opts only work for input channels.

(def f (flow/create-flow
         {:procs {:hello {:proc      (flow/process (flow/lift1->step identity))}
                  :print {:proc (flow/process (flow/lift1->step #(do (prn %) %)))
                          :chan-opts {:in {:xform (map inc)
                                           :buf-or-n 1}}}}
          :conns [[[:hello :out] [:print :in]]]}))

(flow/start f)
(flow/resume f)
; This prints 2 3 4 as expected
@(flow/inject f [:hello :out] [1 2 3])

phronmophobic 2026-02-04T18:56:06.189679Z

multiple outputs can map onto the same input channel

phronmophobic 2026-02-04T18:56:32.654939Z

so while there's a channel for each input, there may not be a channel for each output.