Is it the correct way to concat channels?
(defn- concat-chs [chs]
(let [ch (a/chan)]
(a/pipeline-async 1 ch a/pipe (a/to-chan! chs))
ch))Merge?
Is order important?
very important
For 2 channels, something like
(defn concat2
[x y]
(let [ch (a/chan)]
(a/go
(loop []
(let [a (a/<! x)]
(if (nil? a)
nil
(when (a/>! ch a)
(recur)))))
(loop []
(let [a (a/<! y)]
(if (nil? a)
(a/close! ch)
(when (a/>! ch a)
(recur))))))))
Then reduce over n channels?pipe ?
how would that look like?
Is it correct that I should never ever use put!/`take!` and always use >! / <! in go blocks because put!/`take!` might throw while <!/`>!` will park on backpressure and because of that have more chances to recover?
Error handling could be better… core.async:
(<!! (a/reduce + 0 (a/to-chan! [1 2 3 4 5 "" 6])))
=> nil
manifold:
@(stream/reduce + 0 (stream/->source [1 2 3 4 5 "" 6]))
throws=> ClassCastException...
What the approach to error handling should be if I use core.async? Wrap everything in try catch? Shouldn’t async functions like a/reduce/`a/map` etc. accept an error handler like a/chan does?