Fork me on GitHub
#core-async
<
2023-01-18
>
winsome23:01:00

I've got a thread of sequence transformations and I need to add one that returns a channel:

(async/go
  (->> xs
       (sort-by foo)
       (partition-by bar)
       (map (fn [zs] (async/go (async/<! (some-chan zs)))))))
What's the best way to collect the result from each chan? I've tried async/merge but that is either literally merging the results or only returning the last one.

hiredman23:01:24

hard to tell what results you actually want

winsome23:01:54

I've got a coll of channels and I want a coll of the things inside the channels

hiredman23:01:00

merge will get you a channel where everything read from a collections of channels will be published

hiredman23:01:41

you might want something like async/into

hiredman23:01:02

(async/<! (async/into [] chs))

hiredman23:01:24

sorry, no you need to merge then into

hiredman23:01:47

(async/<! (async/into [] (async/merge chs)))

hiredman23:01:36

something to keep in mind with merge is that it is unordered

winsome23:01:43

🎉 into was what I was missing.

winsome23:01:55

Good note, though - ordering is important in this case.

hiredman23:01:58

its more complicated, but if you feed the partitions into a channel using one of the to-chans you can use pipeline-async to replace the map and have the output be a single channel in order that you can then use into on

🙏 2
Jakub Holý (HolyJak)19:01:21

I was just wondering the same today. Thanks for the tip, hiredman!

Jakub Holý (HolyJak)21:01:58

Something like this?

(a/pipeline-async 15 out-chan
                  (fn [v ch] (a/go (a/>! ch (a/<! v)) (a/close! ch)))
                  (a/to-chan! chs))
?

winsome21:01:42

Mine turned out like this:

(async/go
    (let [error-ch (async/chan)
          out-ch     (async/chan)
          results-ch (async/into [] out-ch)

          work-ch  (->> (sort-by foo xs)
                           (partition-by bar)
                           (async/to-chan))]

      (async/pipeline-async 2
                            out-ch
                            (fn [work-partittion ch]
                              (-> (chan-returning-work-fn work-partition error-ch)
                                  (async/pipe ch)))
                            work-ch)
      (async/alt!
        error-ch ([e] e)
        results-ch ([result] result))))

Jakub Holý (HolyJak)22:01:26

ah, old good a/pipe! thanks!

Peter Chan02:01:05

Is there a built-in function that does this? (async/<! (async/into [] chs)) Similarly, a lazy version? I find myself writing the above quite often for tests / debugging.