Hi, I'm new to missionary and trying to understand the abstractions. I'm working on creating a flow around a JS https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/AsyncGenerator, and using m/observe for it. Does this approach look ok or is there a better way to do it? Also, I'd like for the resulting flow to consume the next iterator lazily, is this possible? thanks
// src/main/starter/asyncgen.js
export async function* abc() {
yield "a";
yield "b";
yield "c";
}
;; src/main/starter/agen.cljs
(ns starter.agen
(:require [missionary.core :as m]
["./asyncgen" :as ag]))
(def >agen
(m/observe
(fn [emit!]
(let [it (ag/abc)]
(.then (.next it) (fn handler [item]
(let [{:keys [done value]} (js->clj item :keywordize-keys true)]
(if done
(emit! (reduced :done))
(do (emit! value)
(.then (.next it) handler))))))))))
((m/reduce (fn [_ v]
(if (reduced? v)
v
(println :v v)))
nil >agen)
#(println :success %) #(println :failure %))
;; => :v a
;; => :v b
;; => :v c
;; => :success :done
(defn promise->task [p]
(let [v (m/dfv)]
(.then p
(fn [x] (v #(do x)))
(fn [e] (v #(throw e))))
(m/absolve v)))
(defn agen->flow [ctor]
(m/ap
(let [it (ctor)]
(loop []
(let [r (m/? (promise->task (.next it)))]
(if (.-done r)
(m/amb)
(m/amb (.-value r)
(recur))))))))
This pattern is more correct because the resulting flow is backpressured - ap waits for the previous value to be consumed before calling next result. observe doesn't do that, if the consumer is slower than the producer then the callback will eventually throw an exception.Thanks, this is what I was looking for. It also answers my next question, which would have been how to create a promise into a task.
Could I ask what m/absolve does? I can't fully understand the docstring. It seems like it should work by simply returning the dataflow variable?
m/absolve: Returns a task running given task completing with a zero-argument function and completing with the result of this function call
see https://github.com/leonoel/missionary/discussions/61 where Dustin links to a gist explaining the promise->task implementation
Thanks @xifi, this was quite illuminating!