This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2024-03-08
Channels
- # announcements (11)
- # babashka (13)
- # beginners (11)
- # biff (2)
- # calva (17)
- # cider (19)
- # clojure (60)
- # clojure-berlin (1)
- # clojure-dev (20)
- # clojure-europe (48)
- # clojure-nl (1)
- # clojure-norway (98)
- # clojure-spec (7)
- # clojure-uk (5)
- # core-typed (32)
- # cursive (13)
- # datomic (12)
- # dev-tooling (5)
- # emacs (7)
- # figwheel-main (2)
- # graalvm (4)
- # hyperfiddle (4)
- # introduce-yourself (1)
- # malli (14)
- # missionary (32)
- # off-topic (7)
- # overtone (4)
- # pedestal (10)
- # proletarian (4)
- # re-frame (8)
- # releases (11)
- # tools-build (1)
- # tools-deps (4)
- # xtdb (38)
Hi, really enjoying missionary. Just got some new questions:
(require '[missionary.core :as m])
(defn stop-on [v]
(fn [rf]
(fn
([] (rf))
([r] (rf r))
([r i]
(if (= i v)
(reduced (rf r i))
(rf r i))))))
(defn run-reduce! [flow]
(m/? (m/reduce (fn [_ x] (prn :> x)) nil flow)))
(defn f-seq [mark n]
(->> (m/observe (fn [!]
((m/via m/blk (doseq [i (range n)] (! i))) {} {})
(fn [] (prn :stop mark))))
;; q1: observe's doc seems showing that it can not terminate the
;; generated flow itself, is that true?
;;
;; q2: I can do early termination with eduction or reductions with
;; `reduced` is this the only way to do early termination?
(m/eduction (stop-on (- n 1)))))
;;; q3: testing on JVM, m/? blocks & runs the `reduce` task, the task stops when
;;; upstream flow stops, is this what I should be expecting? (and for this
;;; example, the task does terminate)
(run-reduce! (f-seq :t 5))
;;; q3 follow up, with a `ap` & forking, this one also terminates
(run-reduce! (m/ap (m/?> (f-seq :t 5))))
;;; this two is why I raised the q3, when f0 is less than 5, the task
;;; terminates, but when >= 5, the task blocks
;;;
;;; q4: when would ap stops propagating? is it just like observe, we need to
;;; manually stop it with eduction/reductions, or that it woulld stop when
;;; all dependency flow stops
(run-reduce! (m/ap (let [v0 (m/?> (f-seq :f0 2))
v1 (m/?> (f-seq :f1 1))]
[v0 v1])))
(run-reduce! (m/ap (let [v0 (m/?> (f-seq :f0 5))
v1 (m/?> (f-seq :f1 1))]
[v0 v1])))
q1, q2: yes, however observe self-termination is under consideration https://github.com/leonoel/missionary/issues/6
q4: ap
terminates when all its dependencies terminate. It is also possible to cancel it with e.g. a downstream reduced
, the cancellation will propagate to its deps
Thanks! Then the question remains for the last one. Why it blocks (while the second last terminates)?
Those two
(run-reduce! (m/ap (let [v0 (m/?> (f-seq :f0 2))
v1 (m/?> (f-seq :f1 1))]
[v0 v1])))
(run-reduce! (m/ap (let [v0 (m/?> (f-seq :f0 5))
v1 (m/?> (f-seq :f1 1))]
[v0 v1])))
If you print the result of the producer you can get a hint
((m/via m/blk (doseq [i (range n)] (! i)))
(fn [_] (prn :producer-done))
(fn [e] (prn :producer-error e)))
The producer thread (from m/blk
) crashes with "Can't process event - observer is not ready."
. This is a backpressure error. The producer thread sends values as quickly as possible, competing with the consumer thread (from the repl), also receiving as quickly as possible. m/observe
is not backpressured, so eventually the producer sends a value while the consumer thread is still processing the previous one, resulting in an exception thrown by !
. At this point, m/observe
stops producing and never reaches the target value 5
.You can insert a sleep between sends, to emulate a slow producer (doseq [i (range n)] (Thread/sleep 10) (! i))
Also under consideration - block the producer thread instead of crashing in this case https://github.com/leonoel/missionary/issues/93
A couple questions about threads in missionary.
• In Clojure JVM, are we always just using OS threads?
• What is the difference between using m/sp
and using a specified executor like (m/via m/cpu ...)
. I understand it's supposed to be optimized for cpu bound tasks in some way, but how so?
missionary doesn't expose any primitive akin to an OS thread, but leverages OS threads to run user code.
• sp
emulates the sequential logic and park/resume behavior of threads. It doesn't bind the computation to any specific OS thread, and the (Thread/currentThread)
usually changes during the lifecycle of an sp
process. There must be only fast operations in the body, expensive calls must be wrapped in tasks and called with ?
• m/via
does bind the computation to a single OS thread managed by the executor passed as first argument. Expensive operations are allowed, but the executor should be properly sized according the nature of the computation - cpu-bound vs io-bound
So essentially sp
using expensive calls in m/sp
requires you to wrap m/?
around it so it can properly park the thread or give the task to another thread?
> m/via
does bind the computation to a single OS thread managed by the executor passed as first argument.
Does this mean the task cannot be put on other threads like something m/sp
could? I have noticed the docs have an example where 1000 m/via
tasks are run. It sounds like that would be a bad idea if the tasks actually had to wait for a while.
> So essentially sp
using expensive calls in m/sp
requires you to wrap m/?
around it so it can properly park the thread or give the task to another thread?
yes, basically all operators expect synchronous calls to be fast. Not fast means asynchronous, so be explicit and use via
. Typical pattern :
(m/sp
(m/? (m/via m/blk (io-bound-computation)))
(m/? (m/via m/cpu (cpu-bound-computation))))
(m/via m/blk
(io-bound-computation)
(m/? (m/via m/cpu (cpu-bound-computation))))
this works too, but here an OS thread is wasted waiting for the cpu task to completeSo a task in m/via
will never be given to another thread?
In that case, is there a reason one would use (m/via m/blk)
over just regular m/sp
tasks? I would imagine when it comes to waiting for a task to finish, it makes more sense to park the thread than it does to have a discrete OS thread.
The purpose of (m/via m/blk)
is to interop with blocking IO. If you need to wait for a task to finish, use m/sp
so you don't waste an OS thread doing so.
Isn't blocking IO exactly the kind of thing I never want to waste an OS thread for? What's an example of a blocking IO I'd want to block a whole OS thread for? Seems like I should always use m/sp
Ah, so it's just for things that don't have a way to run async.
So Thread/sleep
is an example of an IO function we'd need to use with (m/via m/blk)
.
And we can't use expensive operations in m/sp
or we just shouldn't? Don't both of these run the expensive computations in parallel?
(m/? (m/join vector
(m/sp (expensive-calculation-1))
(m/sp (expensive-calculation-2))))
(m/? (m/join vector
(m/via m/cpu (expensive-calculation-1))
(m/via m/cpu (expensive-calculation-2))))
What is the former doing? Processing one at a time?
Oh yeah
yes, when a sp
process is spawned it runs synchronously as much as it can until it encounters a ?
Okay, so m/join
doesn't automatically parallelize things, it relies on them having waiting logic of some kind. Which can only be done with CPU bound stuff if it's turned into the appropriate task.