This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2019-10-23
Channels
- # announcements (1)
- # babashka (29)
- # beginners (53)
- # berlin (1)
- # cider (14)
- # clj-kondo (18)
- # cljsrn (16)
- # clojure (141)
- # clojure-france (4)
- # clojure-italy (8)
- # clojure-norway (1)
- # clojure-uk (57)
- # core-async (7)
- # cursive (3)
- # data-science (2)
- # datomic (12)
- # duct (5)
- # fulcro (27)
- # hoplon (37)
- # immutant (1)
- # jobs (2)
- # jobs-discuss (7)
- # kaocha (2)
- # leiningen (3)
- # music (17)
- # nyc (1)
- # off-topic (22)
- # pathom (27)
- # re-frame (33)
- # reitit (23)
- # shadow-cljs (20)
- # tools-deps (15)
- # vim (29)
How to retrieve items from multiple channels (each containing 0-1 items)? What I want to do is something like this:
(let [[err-ch data-ch] (a/split throwable? input-chan)
result-ch (a/reduce extract-and-combine-data nil data-ch)
errors-ch (a/into [] err-ch)]
(a/go {:result (a/<! result-ch) :errors (a/<! errors-ch)}))
The problem is that I have to consume all input-chan
items - and therefore all items on both err-ch
and data-ch
- for all three channels to close (because back-pressure and buffers of size 0). F.ex. if input-chan
only contains errors then the code will block forever at (a/<! result-ch)
because the (empty) result-ch
won't be closed until errors-ch
is consumed. Is there a working way to do this?
What I can think of is:
1. Wrap both result-ch
and errors-ch
into a buffer 1 channel so that they will not need to wait to be read from to consume the inputs (i.e. add result-ch' (chan 1), _ (pipe result-ch result-ch')
)
2. Use alts!
to read whatever comes first and then read the other channel: (a/go (let [[v ch] (a/alts! [result-ch errors-ch])
res (if (= ch result-ch) v (a/<! result-ch))
err (if (= ch errors-ch) v (a/<! errors-ch))]
{:result res :errors err}))
How do you do troubleshooting with async? Normally I just put (def *args [arg1 ...])
into a function to capture its arguments and be able to play with it from the REPL but that is useless for channels as they will be already consumed. So I guess I need to make a copy of the channel using mult
. Or?
@holyjak why do you need to process both the results and errors at the same time? why not have separate functions/go blocks?
1. I assume I must consume both channels so that the writer isn't blocked. 2. I want to report # errors vs # ordinary items.
@holyjak, If I correctly understood your problem: have you tried to consume all items into a collection to “unpack” data from the channels?
(a/into coll input-chan)
then
(split-with throwable? coll)
Apparently, all data is already being streamed through the input-chan
. Unless, I’m missing something. It would be a matter of consuming everything into a collection then splitting later. Considering that the input channel is to be closed signaling the completion.