Fork me on GitHub
#missionary
<
2024-03-08
>
lotuc01:03:56

Hi, really enjoying missionary. Just got some new questions:

(require '[missionary.core :as m])

(defn stop-on [v]
  (fn [rf]
    (fn
      ([] (rf))
      ([r] (rf r))
      ([r i]
       (if (= i v)
         (reduced (rf r i))
         (rf r i))))))

(defn run-reduce! [flow]
  (m/? (m/reduce (fn [_ x] (prn :> x)) nil flow)))

(defn f-seq [mark n]
  (->> (m/observe (fn [!]
                    ((m/via m/blk (doseq [i (range n)] (! i))) {} {})
                    (fn [] (prn :stop mark))))
       ;; q1: observe's doc seems showing that it can not terminate the
       ;;     generated flow itself, is that true?
       ;;
       ;; q2: I can do early termination with eduction or reductions with
       ;;     `reduced` is this the only way to do early termination?
       (m/eduction (stop-on (- n 1)))))

;;; q3: testing on JVM, m/? blocks & runs the `reduce` task, the task stops when
;;;     upstream flow stops, is this what I should be expecting? (and for this
;;;     example, the task does terminate)
(run-reduce! (f-seq :t 5))
;;; q3 follow up, with a `ap` & forking, this one also terminates
(run-reduce! (m/ap (m/?> (f-seq :t 5))))

;;; this two is why I raised the q3, when f0 is less than 5, the task
;;; terminates, but when >= 5, the task blocks
;;;
;;; q4: when would ap stops propagating? is it just like observe, we need to
;;;     manually stop it with eduction/reductions, or that it woulld stop when
;;;     all dependency flow stops
(run-reduce! (m/ap (let [v0 (m/?> (f-seq :f0 2))
                         v1 (m/?> (f-seq :f1 1))]
                     [v0 v1])))
(run-reduce! (m/ap (let [v0 (m/?> (f-seq :f0 5))
                         v1 (m/?> (f-seq :f1 1))]
                     [v0 v1])))

leonoel09:03:30

q1, q2: yes, however observe self-termination is under consideration https://github.com/leonoel/missionary/issues/6

leonoel09:03:12

q3: yes, the reduce task terminates when its input flow terminates

leonoel09:03:58

q4: ap terminates when all its dependencies terminate. It is also possible to cancel it with e.g. a downstream reduced, the cancellation will propagate to its deps

lotuc09:03:48

Thanks! Then the question remains for the last one. Why it blocks (while the second last terminates)?

leonoel09:03:50

in which example ?

lotuc09:03:55

Those two

(run-reduce! (m/ap (let [v0 (m/?> (f-seq :f0 2))
                         v1 (m/?> (f-seq :f1 1))]
                     [v0 v1])))
(run-reduce! (m/ap (let [v0 (m/?> (f-seq :f0 5))
                         v1 (m/?> (f-seq :f1 1))]
                     [v0 v1])))

leonoel10:03:39

If you print the result of the producer you can get a hint

((m/via m/blk (doseq [i (range n)] (! i)))
 (fn [_] (prn :producer-done))
 (fn [e] (prn :producer-error e)))
The producer thread (from m/blk) crashes with "Can't process event - observer is not ready.". This is a backpressure error. The producer thread sends values as quickly as possible, competing with the consumer thread (from the repl), also receiving as quickly as possible. m/observe is not backpressured, so eventually the producer sends a value while the consumer thread is still processing the previous one, resulting in an exception thrown by !. At this point, m/observe stops producing and never reaches the target value 5.

🙌 1
leonoel10:03:40

You can insert a sleep between sends, to emulate a slow producer (doseq [i (range n)] (Thread/sleep 10) (! i))

leonoel10:03:11

Also under consideration - block the producer thread instead of crashing in this case https://github.com/leonoel/missionary/issues/93

Dallas Surewood18:03:50

A couple questions about threads in missionary. • In Clojure JVM, are we always just using OS threads? • What is the difference between using m/sp and using a specified executor like (m/via m/cpu ...). I understand it's supposed to be optimized for cpu bound tasks in some way, but how so?

leonoel19:03:50

missionary doesn't expose any primitive akin to an OS thread, but leverages OS threads to run user code. • sp emulates the sequential logic and park/resume behavior of threads. It doesn't bind the computation to any specific OS thread, and the (Thread/currentThread) usually changes during the lifecycle of an sp process. There must be only fast operations in the body, expensive calls must be wrapped in tasks and called with ?m/via does bind the computation to a single OS thread managed by the executor passed as first argument. Expensive operations are allowed, but the executor should be properly sized according the nature of the computation - cpu-bound vs io-bound

Dallas Surewood19:03:49

So essentially sp using expensive calls in m/sp requires you to wrap m/? around it so it can properly park the thread or give the task to another thread? > m/via does bind the computation to a single OS thread managed by the executor passed as first argument. Does this mean the task cannot be put on other threads like something m/sp could? I have noticed the docs have an example where 1000 m/via tasks are run. It sounds like that would be a bad idea if the tasks actually had to wait for a while.

leonoel19:03:08

> So essentially sp using expensive calls in m/sp requires you to wrap m/? around it so it can properly park the thread or give the task to another thread? yes, basically all operators expect synchronous calls to be fast. Not fast means asynchronous, so be explicit and use via. Typical pattern :

(m/sp
  (m/? (m/via m/blk (io-bound-computation)))
  (m/? (m/via m/cpu (cpu-bound-computation))))

leonoel19:03:57

(m/via m/blk
  (io-bound-computation)
  (m/? (m/via m/cpu (cpu-bound-computation))))
this works too, but here an OS thread is wasted waiting for the cpu task to complete

Dallas Surewood19:03:41

So a task in m/via will never be given to another thread?

Dallas Surewood19:03:27

In that case, is there a reason one would use (m/via m/blk) over just regular m/sp tasks? I would imagine when it comes to waiting for a task to finish, it makes more sense to park the thread than it does to have a discrete OS thread.

leonoel19:03:17

The purpose of (m/via m/blk) is to interop with blocking IO. If you need to wait for a task to finish, use m/sp so you don't waste an OS thread doing so.

Dallas Surewood20:03:03

Isn't blocking IO exactly the kind of thing I never want to waste an OS thread for? What's an example of a blocking IO I'd want to block a whole OS thread for? Seems like I should always use m/sp

leonoel20:03:56

you never want it, but sometimes you have no choice

leonoel20:03:10

some APIs have no async version

Dallas Surewood20:03:14

Ah, so it's just for things that don't have a way to run async.

leonoel20:03:08

yes, that's the only reason

Dallas Surewood20:03:20

So Thread/sleep is an example of an IO function we'd need to use with (m/via m/blk). And we can't use expensive operations in m/sp or we just shouldn't? Don't both of these run the expensive computations in parallel?

(m/? (m/join vector 
       (m/sp (expensive-calculation-1))
       (m/sp (expensive-calculation-2))))

(m/? (m/join vector
       (m/via m/cpu (expensive-calculation-1))
       (m/via m/cpu (expensive-calculation-2))))

leonoel20:03:31

no, only the latter

Dallas Surewood20:03:01

What is the former doing? Processing one at a time?

leonoel20:03:17

yes, when a sp process is spawned it runs synchronously as much as it can until it encounters a ?

Dallas Surewood20:03:04

Okay, so m/join doesn't automatically parallelize things, it relies on them having waiting logic of some kind. Which can only be done with CPU bound stuff if it's turned into the appropriate task.

😮 1
leonoel20:03:16

correct. in fact, m/via is the only operator touching OS threads - and consequently the only one to be unsupported in cljs

Dallas Surewood21:03:52

Thank you so much for clarifying. That helps a lot

👍 1