I've got a thread of sequence transformations and I need to add one that returns a channel:
(async/go
(->> xs
(sort-by foo)
(partition-by bar)
(map (fn [zs] (async/go (async/<! (some-chan zs)))))))
What's the best way to collect the result from each chan? I've tried async/merge but that is either literally merging the results or only returning the last one.hard to tell what results you actually want
I've got a coll of channels and I want a coll of the things inside the channels
merge will get you a channel where everything read from a collections of channels will be published
you might want something like async/into
(async/<! (async/into [] chs))
sorry, no you need to merge then into
(async/<! (async/into [] (async/merge chs)))
something to keep in mind with merge is that it is unordered
🎉 into was what I was missing.
Good note, though - ordering is important in this case.
its more complicated, but if you feed the partitions into a channel using one of the to-chans you can use pipeline-async to replace the map and have the output be a single channel in order that you can then use into on
I was just wondering the same today. Thanks for the tip, hiredman!
Something like this?
(a/pipeline-async 15 out-chan
(fn [v ch] (a/go (a/>! ch (a/<! v)) (a/close! ch)))
(a/to-chan! chs))
?Mine turned out like this:
(async/go
(let [error-ch (async/chan)
out-ch (async/chan)
results-ch (async/into [] out-ch)
work-ch (->> (sort-by foo xs)
(partition-by bar)
(async/to-chan))]
(async/pipeline-async 2
out-ch
(fn [work-partittion ch]
(-> (chan-returning-work-fn work-partition error-ch)
(async/pipe ch)))
work-ch)
(async/alt!
error-ch ([e] e)
results-ch ([result] result))))ah, old good a/pipe! thanks!
Is there a built-in function that does this?
(async/<! (async/into [] chs))
Similarly, a lazy version? I find myself writing the above quite often for tests / debugging.