core-async

winsome 2023-01-18T23:05:00.283749Z

I've got a thread of sequence transformations and I need to add one that returns a channel:

(async/go
  (->> xs
       (sort-by foo)
       (partition-by bar)
       (map (fn [zs] (async/go (async/<! (some-chan zs)))))))
What's the best way to collect the result from each chan? I've tried async/merge but that is either literally merging the results or only returning the last one.

2023-01-18T23:13:24.319609Z

hard to tell what results you actually want

winsome 2023-01-18T23:13:54.006469Z

I've got a coll of channels and I want a coll of the things inside the channels

2023-01-18T23:14:00.673849Z

merge will get you a channel where everything read from a collections of channels will be published

2023-01-18T23:14:41.571099Z

you might want something like async/into

2023-01-18T23:15:02.566189Z

(async/<! (async/into [] chs))

2023-01-18T23:15:24.394039Z

sorry, no you need to merge then into

2023-01-18T23:15:47.983029Z

(async/<! (async/into [] (async/merge chs)))

2023-01-18T23:16:36.279029Z

something to keep in mind with merge is that it is unordered

winsome 2023-01-18T23:16:43.490779Z

🎉 into was what I was missing.

winsome 2023-01-18T23:16:55.458819Z

Good note, though - ordering is important in this case.

2023-01-18T23:25:58.299729Z

its more complicated, but if you feed the partitions into a channel using one of the to-chans you can use pipeline-async to replace the map and have the output be a single channel in order that you can then use into on

🙏 1
Jakub Holý (HolyJak) 2023-01-26T19:34:21.103769Z

I was just wondering the same today. Thanks for the tip, hiredman!

Jakub Holý (HolyJak) 2023-01-26T21:22:58.186409Z

Something like this?

(a/pipeline-async 15 out-chan
                  (fn [v ch] (a/go (a/>! ch (a/<! v)) (a/close! ch)))
                  (a/to-chan! chs))
?

winsome 2023-01-26T21:43:42.363269Z

Mine turned out like this:

(async/go
    (let [error-ch (async/chan)
          out-ch     (async/chan)
          results-ch (async/into [] out-ch)

          work-ch  (->> (sort-by foo xs)
                           (partition-by bar)
                           (async/to-chan))]

      (async/pipeline-async 2
                            out-ch
                            (fn [work-partittion ch]
                              (-> (chan-returning-work-fn work-partition error-ch)
                                  (async/pipe ch)))
                            work-ch)
      (async/alt!
        error-ch ([e] e)
        results-ch ([result] result))))

Jakub Holý (HolyJak) 2023-01-26T22:14:26.509989Z

ah, old good a/pipe! thanks!

Peter Chan 2023-01-27T02:59:05.138459Z

Is there a built-in function that does this? (async/<! (async/into [] chs)) Similarly, a lazy version? I find myself writing the above quite often for tests / debugging.