Fork me on GitHub
#core-async
<
2022-10-12
>
vlaaad09:10:59

Is it the correct way to concat channels?

(defn- concat-chs [chs]
  (let [ch (a/chan)]
    (a/pipeline-async 1 ch a/pipe (a/to-chan! chs))
    ch))

Ben Sless11:10:05

Is order important?

vlaaad12:10:40

very important

Ben Sless16:10:43

For 2 channels, something like

(defn concat2
  [x y]
  (let [ch (a/chan)]
    (a/go
      (loop []
        (let [a (a/<! x)]
          (if (nil? a)
            nil
            (when (a/>! ch a)
              (recur)))))
      (loop []
        (let [a (a/<! y)]
          (if (nil? a)
            (a/close! ch)
            (when (a/>! ch a)
              (recur))))))))
Then reduce over n channels?

vlaaad12:10:17

how would that look like?

vlaaad12:10:02

Is it correct that I should never ever use put!/`take!` and always use >! / <! in go blocks because put!/`take!` might throw while <!/`>!` will park on backpressure and because of that have more chances to recover?

vlaaad13:10:51

Error handling could be better… core.async:

(<!! (a/reduce + 0 (a/to-chan! [1 2 3 4 5 "" 6])))
=> nil
manifold:
@(stream/reduce + 0 (stream/->source [1 2 3 4 5 "" 6]))
throws=> ClassCastException...
What the approach to error handling should be if I use core.async? Wrap everything in try catch? Shouldn’t async functions like a/reduce/`a/map` etc. accept an error handler like a/chan does?