Fork me on GitHub
#funcool
<
2024-03-16
>
winsome03:03:18

I've found myself in need of an async/merge equivalent for csp chans. I've tried copying the the core async implementation, and it works for a toy example but doesn't work in my actual application, so I'm going to paste my translation here in case I've made an obvious error:

(defn merge
  [chans & {:keys [buf xf exh exc] :as opts}]
  (let [out (csp/chan opts)]
    (csp/go-loop [cs (vec chans)]
      (if (pos? (count cs))
        (let [[v ch] (csp/alts cs)]
          (if (nil? v)
            (recur (filterv #(not= ch %) cs))
            (do (csp/put out v)
                (recur cs))))
        (csp/close! out)))
    out))

winsome03:03:59

When I try it in anger something ends up blocking, but that could just as well be a problem in my application, just want to see if there are obvious problems with merge here.