core-async

vlaaad 2022-10-12T09:22:59.997079Z

Is it the correct way to concat channels?

(defn- concat-chs [chs]
  (let [ch (a/chan)]
    (a/pipeline-async 1 ch a/pipe (a/to-chan! chs))
    ch))

Ben Sless 2022-10-12T11:57:49.252749Z

Merge?

Ben Sless 2022-10-12T11:58:05.415449Z

Is order important?

vlaaad 2022-10-12T12:36:40.391589Z

very important

Ben Sless 2022-10-12T16:14:43.326389Z

For 2 channels, something like

(defn concat2
  [x y]
  (let [ch (a/chan)]
    (a/go
      (loop []
        (let [a (a/<! x)]
          (if (nil? a)
            nil
            (when (a/>! ch a)
              (recur)))))
      (loop []
        (let [a (a/<! y)]
          (if (nil? a)
            (a/close! ch)
            (when (a/>! ch a)
              (recur))))))))
Then reduce over n channels?

Alex Miller (Clojure team) 2022-10-12T11:55:03.626899Z

pipe ?

vlaaad 2022-10-12T12:37:17.427279Z

how would that look like?

vlaaad 2022-10-12T12:42:02.354919Z

Is it correct that I should never ever use put!/`take!` and always use >! / <! in go blocks because put!/`take!` might throw while <!/`>!` will park on backpressure and because of that have more chances to recover?

vlaaad 2022-10-12T13:50:51.631139Z

Error handling could be better… core.async:

(<!! (a/reduce + 0 (a/to-chan! [1 2 3 4 5 "" 6])))
=> nil
manifold:
@(stream/reduce + 0 (stream/->source [1 2 3 4 5 "" 6]))
throws=> ClassCastException...
What the approach to error handling should be if I use core.async? Wrap everything in try catch? Shouldn’t async functions like a/reduce/`a/map` etc. accept an error handler like a/chan does?