Fork me on GitHub

@leonoel I added an example of converting async channel to flow to the wiki, but now realized that it only works for clojure (not clojurescript). I have another version, but just wondered if you could have a look and let me know if it looks ok:

(defn- take! [chan]
  (let [cancelled? (atom false)]
    (fn [succ rejec]
       (fn [r]
           (instance? #?(:clj Exception :cljs js/Error) r) (rejec r)

           (or (nil? r) @cancelled?) nil

           :else (succ r))))
      (fn []
        (reset! cancelled? true)
        (rejec (ex-info "Cancelled" {:cancelled? true}))))))

(defn channel-flow [ch-recv]
  (forever (take! ch-recv)))


I would start with that :

(defn >!
  "Puts given value on given channel, returns a task completing with true when put is accepted, of false if port was closed."
  [c x] (doto (m/dfv) (->> (a/put! c x))))
(defn <!
  "Takes from given channel, returns a task completing with value when take is accepted, or nil if port was closed."
  [c] (doto (m/dfv) (->> (a/take! c))))
Then use sp to add error checks


That's beautiful, thanks!


Specifically, I'm not sure about the reject call inside the cancel handler - is that needed?


it is required that either success or failure callback is called exactly once, and this implementation doesn't enforce that


implementing tasks and flows by hand is not trivial, you should always favor using existing operators if possible (if not possible, I'd like to know more)


Thanks for the help! Using dfv as callback is something I didn't think about - seems very useful.