missionary

Andrew Wilcox 2024-11-17T02:40:43.908449Z

In an m/ap I need to output a sequence of values vs. Is there a more succinct way to do this?

(loop [vs vs]
  (if (empty? vs)
    (m/amb)
    (m/amb (first vs) (recur (next vs)))))

leonoel 2024-11-17T08:48:46.785709Z

(m/?> (m/seed vs))

👍 1
maybenot 2024-11-17T04:49:11.874649Z

Can someone help me understand what's happening with sample?

maybenot 2024-11-17T04:49:17.293589Z

This is a slightly modifed snippet from the docs

(m/? (->> (m/sample vector
                    (m/reductions {} 0 (sleep-emit [1000 100]))
                    (sleep-emit [2000 100]))
          (m/reduce (fn [_ x] (prn (str "received: " x))) nil)))  

Sometimes it behaves expected (at least for me) and returns
; "received: [1000 2000]"
; "received: [100 100]"

but sometimes (often) I get this
; "received: [1000 2000]"
; "received: [1000 100]"

As if the sampled flow is stucked until the sampler returns first result I'd expect sampled flow to run concurrently when the sampler is running, or am I missing something?

leonoel 2024-11-17T09:03:29.406899Z

The sampled flow does run concurrently with the sampler, but the transfer on sampled flow is lazy (i.e. it waits for the sampling event)

leonoel 2024-11-17T09:06:05.307629Z

In this example after the 2000 ms event, both sampler and sampled are transferred, then the two 100 ms delays are scheduled, depending on which comes first you may observe one state or the other

leonoel 2024-11-17T09:08:12.386859Z

If you add a relieve stage the sampled flow will be transferred ASAP, which may be the behavior you expected

(m/? (->> (m/sample vector
                    (m/relieve (m/reductions {} 0 (sleep-emit [1000 100])))
                    (sleep-emit [2000 100]))
          (m/reduce (fn [_ x] (prn (str "received: " x))) nil)))

1
maybenot 2024-11-17T09:31:26.859579Z

Oh, right, now I see, thank you

whilo 2024-11-17T20:18:17.651749Z

I am experimenting with

((let [flow (m/observe (fn [!] (defn send! [m] (! m)) #()))]
     (m/ap (prn "received" (m/?> ##Inf flow))))
   prn prn)
But get Cannot read field "queue" because "b" is null, which is not very helpful. The stacktrace never hits any of my code or missionary or cloroutine unfortunately (core.async is also bad at this, but at least it points at the go block). When I use m/seed for a flow the example works as intended.

leonoel 2024-11-18T09:56:32.800769Z

the first doesn't respect backpressure either, ! blocks the thread when the pipeline is not ready

leonoel 2024-11-18T10:01:43.294649Z

more accurately, it does respect backpressure but not the right way (blocking in core.async's thread pool)

leonoel 2024-11-18T10:14:34.300589Z

I can't reproduce this error. When is it printed ?

leonoel 2024-11-18T16:49:41.351929Z

Check these helpers for correct backpressure propagation https://github.com/hyperfiddle/electric/blob/v2/src/contrib/missionary_core_async.cljc

whilo 2024-11-18T17:45:27.411249Z

Thank you. The other direction, from (discrete) flow to chan is missing there. I am also wondering whether one could just extend the ManyToManyChannel instances (and maybe a few other core.async) types to automatically be flows.

whilo 2024-11-18T17:46:14.427599Z

The error is printed on the REPL when I evaluated this form. Maybe something is off in my project, I will try to run it in a different one and also outside of a REPL.

whilo 2024-11-19T08:15:21.162419Z

Interesting, it seems the original error is a VS Code/Calva issue, if I connect from Emacs the code seems to work. Weird...

xificurC 2024-11-19T08:34:19.879159Z

a possibly related problem to watch out for - editors like to deref things for printing. Missionary processes side effect on deref

whilo 2024-11-19T08:38:18.042759Z

I see.

whilo 2024-11-19T08:40:39.154189Z

Indeed if I put it in a do bock and return nil afterwards it works in VS Code.

whilo 2024-11-19T08:41:19.005799Z

Thx @xifi 🙂

😉 1
whilo 2024-11-17T20:22:17.738749Z

This is the stacktrace, maybe it is a bug with the REPL:

clojure.main/repl (main.clj:442)
clojure.main/repl (main.clj:458)
clojure.main/repl (main.clj:368)
nrepl.middleware.interruptible-eval/evaluate (interruptible_eval.clj:84)
nrepl.middleware.interruptible-eval/evaluate (interruptible_eval.clj:56)
nrepl.middleware.interruptible-eval/interruptible-eval (interruptible_eval.clj:152)
nrepl.middleware.session/session-exec (session.clj:218)
nrepl.middleware.session/session-exec (session.clj:217)
java.lang.Thread/run (Thread.java:1583)

whilo 2024-11-18T01:18:46.344109Z

To hook into my existing core.async code I would like to have something like:

(defn chan->flow [ch]
    (m/observe (fn [!]
                 (go-loop [m (<! ch)]
                   (when m
                     (! m)
                     (recur (<! ch))))
                 #())))

  (defn flow->chan [flow]
    (let [ch (chan)]
      ((m/reduce #(put! ch %2) nil flow)
       prn prn)
      ch))

whilo 2024-11-18T01:19:09.809449Z

The first doesn't work with this error, the second works, but does not respect backpressure.

whilo 2024-11-18T01:21:16.304309Z

I think it would be very nice to have such functions provided to make it easy for people who use other streaming systems in Clojure to explore missionary inside their existing systems without first having to figure out how to glue things together. Error handling is also not properly dealt with here, exceptions should go into the external system and probably also passed differently to missionary.

whilo 2024-11-18T02:14:27.185209Z

I can try to provide them in some form once I have figured things out.