Fork me on GitHub
#missionary
<
2021-10-15
>
mjmeintjes02:10:23

@leonoel I added an example of converting async channel to flow to the wiki, but now realized that it only works for clojure (not clojurescript). I have another version, but just wondered if you could have a look and let me know if it looks ok:

(defn- take! [chan]
  (let [cancelled? (atom false)]
    (fn [succ rejec]
      (async/take!
       chan
       (fn [r]
         (cond
           (instance? #?(:clj Exception :cljs js/Error) r) (rejec r)

           (or (nil? r) @cancelled?) nil

           :else (succ r))))
      (fn []
        (reset! cancelled? true)
        (rejec (ex-info "Cancelled" {:cancelled? true}))))))

(defn channel-flow [ch-recv]
  (forever (take! ch-recv)))

leonoel08:10:39

I would start with that :

(defn >!
  "Puts given value on given channel, returns a task completing with true when put is accepted, of false if port was closed."
  [c x] (doto (m/dfv) (->> (a/put! c x))))
(defn <!
  "Takes from given channel, returns a task completing with value when take is accepted, or nil if port was closed."
  [c] (doto (m/dfv) (->> (a/take! c))))
Then use sp to add error checks

mjmeintjes01:10:24

That's beautiful, thanks!

mjmeintjes02:10:38

Specifically, I'm not sure about the reject call inside the cancel handler - is that needed?

leonoel16:10:41

it is required that either success or failure callback is called exactly once, and this implementation doesn't enforce that

leonoel16:10:37

implementing tasks and flows by hand is not trivial, you should always favor using existing operators if possible (if not possible, I'd like to know more)

mjmeintjes02:10:05

Thanks for the help! Using dfv as callback is something I didn't think about - seems very useful.