I've found myself in need of an async/merge equivalent for csp chans. I've tried copying the the core async implementation, and it works for a toy example but doesn't work in my actual application, so I'm going to paste my translation here in case I've made an obvious error:
(defn merge
[chans & {:keys [buf xf exh exc] :as opts}]
(let [out (csp/chan opts)]
(csp/go-loop [cs (vec chans)]
(if (pos? (count cs))
(let [[v ch] (csp/alts cs)]
(if (nil? v)
(recur (filterv #(not= ch %) cs))
(do (csp/put out v)
(recur cs))))
(csp/close! out)))
out))When I try it in anger something ends up blocking, but that could just as well be a problem in my application, just want to see if there are obvious problems with merge here.