core-async

Reut Sharabani 2026-01-13T01:40:19.344859Z

This was probably discussed before but I was suprised to find out async/pipeline* blocks throughput significantly because it delivers things in the order they are received. So an exmaple is:

(do
    (def c1 (async/chan 1))
    (def c2 (async/chan 1))
    (async/pipeline-blocking 2 c2 (map (fn [j]
                                         (println "processing " j)
                                         (Thread/sleep j)
                                         (println "processed " j)
                                         j)) c1)
    (async/go-loop [i (async/!! c1 95)
    (async/>!! c1 96)
    (async/>!! c1 10000) ;; slow task
    (async/>!! c1 97)
    (async/>!! c1 98)
    (async/>!! c1 99)
    (async/>!! c1 100)
    (async/>!! c1 101)
    (async/>!! c1 102)
    (async/>!! c1 103)
    )
  (async/close! c1)
Obviously the 10 second message can be "dealt with" while all other messages finish, but it just blocks all other messages even though there's "concurrency" available (the following message can get processed, but it's going to wait for this message to finish so it can be delivered in-order). This is very implicit IMO. And very bad for processing blocking operations which naturally have variance in their processing time. I'm not sure what a nice alternative would be, but would like to hear about nice alternatives that won't choke throughput for tasks with big difference in processing time.

2026-01-13T02:01:47.846859Z

I suspect the name pipeline is taken to mean "in order"

phronmophobic 2026-01-13T02:02:44.527769Z

per the https://clojuredocs.org/clojure.core.async/pipeline: >

Outputs will be returned in order relative to the
> inputs.

2026-01-13T02:02:56.676269Z

The pipeline impl largely ties itself in knots to keep that guarantee

2026-01-13T02:03:35.961569Z

You can, for example, easily just throw work on a bounded executor and get results in the order they arrive

2026-01-13T02:04:38.475749Z

the complicating factor there ends up being back pressure on the queue depth of the executor

Reut Sharabani 2026-01-13T02:19:56.271989Z

@hiredman I thought the pipeline is to tie channels togther in ordered operations (and I use them to move items between channels). I opted for a naive implementation with the same API, but it just spawns n worker threads (async/thread) and closes everything down when channel is closed. It's good enough for me. No need to be cynical about the function name. I don't speak English natively and words don't always build as good of a mental model for me. @smith.adriane I probably read the docs years ago, and even when you read them, it is implicit that a slow operation will block all other operations. It's obvious once you think about it, but you still have to think about it. I probably won't forget it now πŸ™‚

Reut Sharabani 2026-01-13T02:21:21.222779Z

And I am still wondering what's the idiomatic way to move items around channels with concurrency n (other than manually spawning workers). Without ordering guarantees

2026-01-13T02:23:35.554539Z

Not sure what you mean about cynical? Pipelines are generally considered to be ordered things, which is why you get "pipeline stalls" when something takes a long time

phronmophobic 2026-01-13T02:23:41.639169Z

> I opted for a naive implementation with the same API, but it just spawns n worker threads (async/thread) and closes everything down when channel is closed. It's good enough for me. I don't think there's anything wrong with that approach. Any different approach will depend on the requirements of your use case.

Reut Sharabani 2026-01-13T02:25:30.146679Z

@smith.adriane I expected to have something like pipeline (pipeline-blocking in my case) that's not ordered. Meaning just move items with some transformation and do it over x threads (blocking)/go-threads (async).

Reut Sharabani 2026-01-13T02:28:22.481339Z

@hiredman I thought your comment about the name was in the spirit of "you should have obviously understood the name and implications" (I didn't). Maybe it's just the difficulty of text communication.

phronmophobic 2026-01-13T02:33:41.320379Z

You can file a request using https://ask.clojure.org

phronmophobic 2026-01-13T02:34:57.842529Z

I don't think there's a core.async function that does that directly, but it shouldn't take much code to implement. Happy to review your code if you want feedback.

2026-01-13T02:37:20.160529Z

Oh, I mean it is a common enough point that people trip on, also with seque or pmap

πŸ‘ 1
2026-01-13T02:38:12.532479Z

pvalues (which I have never seen anyone use)

Reut Sharabani 2026-01-13T02:46:34.306939Z

@smith.adriane this is what I use now (implemented in 5 minutes just now so there are probably bugs, didn't run it yet other than in a comment block):

(defn spawn-worker [worker-name to f from ex-handler]
  (async/thread
    (log/info (format "Starting worker %s" worker-name))
    (loop [item (async/<!! from)]
      (when item
        (async/>!! to
                   (try
                     (f item)
                     (catch Exception e
                       (ex-handler e item))))
        (recur (async/<!! from))))
    (log/info (format "Worker %s exiting" worker-name))))

(defn pipe
  "process items from 'from' channel with function 'f' using 'n' workers,
   putting results into 'to' channel. Exceptions during processing are handled
   by 'ex-handler' function.
   ex-handler takes two arguments [e item]: the exception and the item being processed (before f is applied).
   will close 'to' channel when from channel is closed and all workers have exited."
  [pipe-name n to f from ex-handler]
  (log/info "Starting pipeline [%s] with %d workers" pipe-name n)
  (let [workers (for [i (range n)]
                  (let [worker-name (str pipe-name i)]
                    (spawn-worker worker-name to f from ex-handler)))]
    (async/thread
      (doseq [worker workers]
        (async/<!! worker))
      (log/info (format "All pipeline [%s] workers have exited" pipe-name))
      (async/close! to))))
I don't care much about it being optimized as my work is dominated by IO (hence the problem with ordered processing).

dpsutton 2026-01-13T03:06:37.898299Z

i think there’s a gist from ghadi that removes the in-order property

phronmophobic 2026-01-13T03:26:49.225219Z

Looks good to me. Here are some minor suggestions which you may or may not need: β€’ terminate worker when to is closed by checking result of async/>!! β€’ make spawn-worker private β€’ use empty loop bindings to deduplicate code for getting items β€’ I didn't make any fixes, but a worker will crash if either f or ex-handler returns nil. β€’ Replace for with into when creating workers. I generally encourage folks to avoid lazy sequences if you care about when code is run. For small n, it probably doesn't change the behavior due to chunking, but for large n, some workers may not spawn until other workers finish!

(defn ^:private spawn-worker [worker-name to f from ex-handler]
  (async/thread
   (log/info (format "Starting worker %s" worker-name))
   (loop []
     (when-let [item (async/!! to (try
                             (f item)
                             (catch Exception e
                               (ex-handler e item))))
         (recur))))
   (log/info (format "Worker %s exiting" worker-name))))

(defn pipe
  "process items from 'from' channel with function 'f' using 'n' workers,
   putting results into 'to' channel. Exceptions during processing are handled
   by 'ex-handler' function.
   ex-handler takes two arguments [e item]: the exception and the item being processed (before f is applied).
   will close 'to' channel when from channel is closed and all workers have exited."
  [pipe-name n to f from ex-handler]
  (log/info "Starting pipeline [%s] with %d workers" pipe-name n)
  (let [workers (into []
                      (map (fn [i]
                             (let [worker-name (str pipe-name i)]
                               (spawn-worker worker-name to f from ex-handler))))
                      (range n))]
    (async/thread
      (doseq [worker workers]
        (async/

πŸ™ 1