Fork me on GitHub
Jakub Holý (HolyJak)05:10:41

How to retrieve items from multiple channels (each containing 0-1 items)? What I want to do is something like this:

(let [[err-ch data-ch] (a/split throwable? input-chan)
      result-ch (a/reduce extract-and-combine-data nil data-ch)
      errors-ch (a/into [] err-ch)]
  (a/go {:result (a/<! result-ch) :errors (a/<! errors-ch)}))
The problem is that I have to consume all input-chan items - and therefore all items on both err-ch and data-ch - for all three channels to close (because back-pressure and buffers of size 0). F.ex. if input-chan only contains errors then the code will block forever at (a/<! result-ch) because the (empty) result-ch won't be closed until errors-ch is consumed. Is there a working way to do this? What I can think of is: 1. Wrap both result-ch and errors-ch into a buffer 1 channel so that they will not need to wait to be read from to consume the inputs (i.e. add result-ch' (chan 1), _ (pipe result-ch result-ch')) 2. Use alts! to read whatever comes first and then read the other channel:
(a/go (let [[v ch] (a/alts! [result-ch errors-ch])
              res (if (= ch result-ch) v (a/<! result-ch))
              err (if (= ch errors-ch) v (a/<! errors-ch))]
          {:result res :errors err}))

Jakub Holý (HolyJak)09:10:40

How do you do troubleshooting with async? Normally I just put (def *args [arg1 ...]) into a function to capture its arguments and be able to play with it from the REPL but that is useless for channels as they will be already consumed. So I guess I need to make a copy of the channel using mult. Or?


@holyjak why do you need to process both the results and errors at the same time? why not have separate functions/go blocks?

Jakub Holý (HolyJak)15:10:28

1. I assume I must consume both channels so that the writer isn't blocked. 2. I want to report # errors vs # ordinary items.


@holyjak, If I correctly understood your problem: have you tried to consume all items into a collection to “unpack” data from the channels?

(a/into coll input-chan)
(split-with throwable? coll)

👎 4

what about attaching the cat transducer to a chan?


Apparently, all data is already being streamed through the input-chan. Unless, I’m missing something. It would be a matter of consuming everything into a collection then splitting later. Considering that the input channel is to be closed signaling the completion.