missionary

chaos 2024-05-08T06:33:31.283429Z

Hi, I'm new to missionary and trying to understand the abstractions. I'm working on creating a flow around a JS https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/AsyncGenerator, and using m/observe for it. Does this approach look ok or is there a better way to do it? Also, I'd like for the resulting flow to consume the next iterator lazily, is this possible? thanks

// src/main/starter/asyncgen.js
export async function* abc() {
  yield "a";
  yield "b";
  yield "c";
}
;; src/main/starter/agen.cljs
(ns starter.agen
  (:require [missionary.core :as m]
            ["./asyncgen" :as ag]))

(def >agen 
  (m/observe
   (fn [emit!]
     (let [it (ag/abc)]
       (.then (.next it) (fn handler [item]
                           (let [{:keys [done value]} (js->clj item :keywordize-keys true)]
                             (if done
                               (emit! (reduced :done))
                               (do (emit! value)
                                   (.then (.next it) handler))))))))))


((m/reduce (fn [_ v]
             (if (reduced? v)
               v
               (println :v v)))
           nil >agen)
 #(println :success %) #(println :failure %))
;; => :v a
;; => :v b
;; => :v c
;; => :success :done

leonoel 2024-05-08T18:16:39.807529Z

(defn promise->task [p]
  (let [v (m/dfv)]
    (.then p
      (fn [x] (v #(do x)))
      (fn [e] (v #(throw e))))
    (m/absolve v)))

(defn agen->flow [ctor]
  (m/ap
    (let [it (ctor)]
      (loop []
        (let [r (m/? (promise->task (.next it)))]
          (if (.-done r)
            (m/amb)
            (m/amb (.-value r)
              (recur))))))))
This pattern is more correct because the resulting flow is backpressured - ap waits for the previous value to be consumed before calling next result. observe doesn't do that, if the consumer is slower than the producer then the callback will eventually throw an exception.

chaos 2024-05-09T06:49:08.543239Z

Thanks, this is what I was looking for. It also answers my next question, which would have been how to create a promise into a task. Could I ask what m/absolve does? I can't fully understand the docstring. It seems like it should work by simply returning the dataflow variable? m/absolve: Returns a task running given task completing with a zero-argument function and completing with the result of this function call

xificurC 2024-05-09T07:10:50.893549Z

see https://github.com/leonoel/missionary/discussions/61 where Dustin links to a gist explaining the promise->task implementation

chaos 2024-05-09T17:28:33.365989Z

Thanks @xifi, this was quite illuminating!