Edit: I figured out I need to close c myself 🙂 make sense.
Probably a simple question but why does this code not work?
I'm migrating some stuff to async-pipeline and I think I don't understand it's semantics:
(let [s (async/chan 100)
r (async/chan 100)]
(async/pipeline-async 3
r
(fn [x c]
(async/go
(async/>! c (inc x))
;; this was missing
(async/close! c))
s)
(async/>!! s 1)
(async/>!! s 2)
(async/>!! s 3)
(async/close! s)
(async/
I expected the final call to return [2 3 4] but it just hangs.
To my understanding the pipeline is set up, processes the values from s and then stops when I close the source s (along with closing r). After that I should be able to read everything from r's buffer (last call).
What am I missing?Why do you have an async go inside the transducer?
Oh, pipeline-async. I see now. But you know that you're not doing async work here? I'm assuming it's just an example.
Otherwise, if all you are doing is compute, you should just use pipeline
You'd use pipeline-async if say you are making an async http request.
And you'd use pipeline-blocking if say you are making a sync http request
Also, something a bit confusing. But inside the AF you can actually put multiple results on the channel. And then why all get processed and put on the result channel. You do need to close! the channel once you are done putting on it.
It's just an example :)