In an m/ap I need to output a sequence of values vs. Is there a more succinct way to do this?
(loop [vs vs]
(if (empty? vs)
(m/amb)
(m/amb (first vs) (recur (next vs)))))(m/?> (m/seed vs))
Can someone help me understand what's happening with sample?
This is a slightly modifed snippet from the docs
(m/? (->> (m/sample vector
(m/reductions {} 0 (sleep-emit [1000 100]))
(sleep-emit [2000 100]))
(m/reduce (fn [_ x] (prn (str "received: " x))) nil)))
Sometimes it behaves expected (at least for me) and returns
; "received: [1000 2000]"
; "received: [100 100]"
but sometimes (often) I get this
; "received: [1000 2000]"
; "received: [1000 100]"
As if the sampled flow is stucked until the sampler returns first result
I'd expect sampled flow to run concurrently when the sampler is running, or am I missing something?The sampled flow does run concurrently with the sampler, but the transfer on sampled flow is lazy (i.e. it waits for the sampling event)
In this example after the 2000 ms event, both sampler and sampled are transferred, then the two 100 ms delays are scheduled, depending on which comes first you may observe one state or the other
If you add a relieve stage the sampled flow will be transferred ASAP, which may be the behavior you expected
(m/? (->> (m/sample vector
(m/relieve (m/reductions {} 0 (sleep-emit [1000 100])))
(sleep-emit [2000 100]))
(m/reduce (fn [_ x] (prn (str "received: " x))) nil)))Oh, right, now I see, thank you
I am experimenting with
((let [flow (m/observe (fn [!] (defn send! [m] (! m)) #()))]
(m/ap (prn "received" (m/?> ##Inf flow))))
prn prn)
But get Cannot read field "queue" because "b" is null, which is not very helpful. The stacktrace never hits any of my code or missionary or cloroutine unfortunately (core.async is also bad at this, but at least it points at the go block). When I use m/seed for a flow the example works as intended.the first doesn't respect backpressure either, ! blocks the thread when the pipeline is not ready
more accurately, it does respect backpressure but not the right way (blocking in core.async's thread pool)
I can't reproduce this error. When is it printed ?
Check these helpers for correct backpressure propagation https://github.com/hyperfiddle/electric/blob/v2/src/contrib/missionary_core_async.cljc
Thank you. The other direction, from (discrete) flow to chan is missing there. I am also wondering whether one could just extend the ManyToManyChannel instances (and maybe a few other core.async) types to automatically be flows.
The error is printed on the REPL when I evaluated this form. Maybe something is off in my project, I will try to run it in a different one and also outside of a REPL.
Interesting, it seems the original error is a VS Code/Calva issue, if I connect from Emacs the code seems to work. Weird...
a possibly related problem to watch out for - editors like to deref things for printing. Missionary processes side effect on deref
I see.
Indeed if I put it in a do bock and return nil afterwards it works in VS Code.
This is the stacktrace, maybe it is a bug with the REPL:
clojure.main/repl (main.clj:442)
clojure.main/repl (main.clj:458)
clojure.main/repl (main.clj:368)
nrepl.middleware.interruptible-eval/evaluate (interruptible_eval.clj:84)
nrepl.middleware.interruptible-eval/evaluate (interruptible_eval.clj:56)
nrepl.middleware.interruptible-eval/interruptible-eval (interruptible_eval.clj:152)
nrepl.middleware.session/session-exec (session.clj:218)
nrepl.middleware.session/session-exec (session.clj:217)
java.lang.Thread/run (Thread.java:1583)To hook into my existing core.async code I would like to have something like:
(defn chan->flow [ch]
(m/observe (fn [!]
(go-loop [m (<! ch)]
(when m
(! m)
(recur (<! ch))))
#())))
(defn flow->chan [flow]
(let [ch (chan)]
((m/reduce #(put! ch %2) nil flow)
prn prn)
ch))The first doesn't work with this error, the second works, but does not respect backpressure.
I think it would be very nice to have such functions provided to make it easy for people who use other streaming systems in Clojure to explore missionary inside their existing systems without first having to figure out how to glue things together. Error handling is also not properly dealt with here, exceptions should go into the external system and probably also passed differently to missionary.
I can try to provide them in some form once I have figured things out.