core-async

Reut Sharabani 2025-04-19T17:05:20.892539Z

Edit: I figured out I need to close c myself 🙂 make sense. Probably a simple question but why does this code not work? I'm migrating some stuff to async-pipeline and I think I don't understand it's semantics:

(let [s (async/chan 100)
        r (async/chan 100)]
    (async/pipeline-async 3
                          r
                          (fn [x c]
                            (async/go
                              (async/>! c (inc x))
                              ;; this was missing
                              (async/close! c))
                          s)
    (async/>!! s 1)
    (async/>!! s 2)
    (async/>!! s 3)
    (async/close! s)

    (async/
I expected the final call to return [2 3 4] but it just hangs. To my understanding the pipeline is set up, processes the values from s and then stops when I close the source s (along with closing r). After that I should be able to read everything from r's buffer (last call). What am I missing?

2025-04-19T18:27:05.237889Z

Why do you have an async go inside the transducer?

2025-04-19T18:29:44.006149Z

Oh, pipeline-async. I see now. But you know that you're not doing async work here? I'm assuming it's just an example.

2025-04-19T18:30:46.963539Z

Otherwise, if all you are doing is compute, you should just use pipeline

2025-04-19T18:31:15.415159Z

You'd use pipeline-async if say you are making an async http request.

2025-04-19T18:31:41.451439Z

And you'd use pipeline-blocking if say you are making a sync http request

2025-04-19T18:38:16.870169Z

Also, something a bit confusing. But inside the AF you can actually put multiple results on the channel. And then why all get processed and put on the result channel. You do need to close! the channel once you are done putting on it.

Reut Sharabani 2025-04-19T20:01:36.351279Z

It's just an example :)