missionary

lotuc 2024-03-08T01:15:56.576999Z

Hi, really enjoying missionary. Just got some new questions:

(require '[missionary.core :as m])

(defn stop-on [v]
  (fn [rf]
    (fn
      ([] (rf))
      ([r] (rf r))
      ([r i]
       (if (= i v)
         (reduced (rf r i))
         (rf r i))))))

(defn run-reduce! [flow]
  (m/? (m/reduce (fn [_ x] (prn :> x)) nil flow)))

(defn f-seq [mark n]
  (->> (m/observe (fn [!]
                    ((m/via m/blk (doseq [i (range n)] (! i))) {} {})
                    (fn [] (prn :stop mark))))
       ;; q1: observe's doc seems showing that it can not terminate the
       ;;     generated flow itself, is that true?
       ;;
       ;; q2: I can do early termination with eduction or reductions with
       ;;     `reduced` is this the only way to do early termination?
       (m/eduction (stop-on (- n 1)))))

;;; q3: testing on JVM, m/? blocks & runs the `reduce` task, the task stops when
;;;     upstream flow stops, is this what I should be expecting? (and for this
;;;     example, the task does terminate)
(run-reduce! (f-seq :t 5))
;;; q3 follow up, with a `ap` & forking, this one also terminates
(run-reduce! (m/ap (m/?> (f-seq :t 5))))

;;; this two is why I raised the q3, when f0 is less than 5, the task
;;; terminates, but when >= 5, the task blocks
;;;
;;; q4: when would ap stops propagating? is it just like observe, we need to
;;;     manually stop it with eduction/reductions, or that it woulld stop when
;;;     all dependency flow stops
(run-reduce! (m/ap (let [v0 (m/?> (f-seq :f0 2))
                         v1 (m/?> (f-seq :f1 1))]
                     [v0 v1])))
(run-reduce! (m/ap (let [v0 (m/?> (f-seq :f0 5))
                         v1 (m/?> (f-seq :f1 1))]
                     [v0 v1])))

leonoel 2024-03-08T09:12:30.325819Z

q1, q2: yes, however observe self-termination is under consideration https://github.com/leonoel/missionary/issues/6

leonoel 2024-03-08T09:13:12.904139Z

q3: yes, the reduce task terminates when its input flow terminates

leonoel 2024-03-08T09:14:58.830379Z

q4: ap terminates when all its dependencies terminate. It is also possible to cancel it with e.g. a downstream reduced, the cancellation will propagate to its deps

lotuc 2024-03-08T09:40:48.291639Z

Thanks! Then the question remains for the last one. Why it blocks (while the second last terminates)?

leonoel 2024-03-08T09:41:50.550349Z

in which example ?

lotuc 2024-03-08T09:42:55.762349Z

Those two

(run-reduce! (m/ap (let [v0 (m/?> (f-seq :f0 2))
                         v1 (m/?> (f-seq :f1 1))]
                     [v0 v1])))
(run-reduce! (m/ap (let [v0 (m/?> (f-seq :f0 5))
                         v1 (m/?> (f-seq :f1 1))]
                     [v0 v1])))

leonoel 2024-03-08T10:08:39.940399Z

If you print the result of the producer you can get a hint

((m/via m/blk (doseq [i (range n)] (! i)))
 (fn [_] (prn :producer-done))
 (fn [e] (prn :producer-error e)))
The producer thread (from m/blk) crashes with "Can't process event - observer is not ready.". This is a backpressure error. The producer thread sends values as quickly as possible, competing with the consumer thread (from the repl), also receiving as quickly as possible. m/observe is not backpressured, so eventually the producer sends a value while the consumer thread is still processing the previous one, resulting in an exception thrown by !. At this point, m/observe stops producing and never reaches the target value 5.

🙌 1
leonoel 2024-03-08T10:10:40.127699Z

You can insert a sleep between sends, to emulate a slow producer (doseq [i (range n)] (Thread/sleep 10) (! i))

leonoel 2024-03-08T10:12:11.260679Z

Also under consideration - block the producer thread instead of crashing in this case https://github.com/leonoel/missionary/issues/93

lotuc 2024-03-08T14:21:45.263299Z

Thanks.

Dallas Surewood 2024-03-08T18:27:50.714059Z

A couple questions about threads in missionary. • In Clojure JVM, are we always just using OS threads? • What is the difference between using m/sp and using a specified executor like (m/via m/cpu ...). I understand it's supposed to be optimized for cpu bound tasks in some way, but how so?

leonoel 2024-03-08T19:10:50.790009Z

missionary doesn't expose any primitive akin to an OS thread, but leverages OS threads to run user code. • sp emulates the sequential logic and park/resume behavior of threads. It doesn't bind the computation to any specific OS thread, and the (Thread/currentThread) usually changes during the lifecycle of an sp process. There must be only fast operations in the body, expensive calls must be wrapped in tasks and called with ?m/via does bind the computation to a single OS thread managed by the executor passed as first argument. Expensive operations are allowed, but the executor should be properly sized according the nature of the computation - cpu-bound vs io-bound

Dallas Surewood 2024-03-08T19:33:49.229729Z

So essentially sp using expensive calls in m/sp requires you to wrap m/? around it so it can properly park the thread or give the task to another thread? > m/via does bind the computation to a single OS thread managed by the executor passed as first argument. Does this mean the task cannot be put on other threads like something m/sp could? I have noticed the docs have an example where 1000 m/via tasks are run. It sounds like that would be a bad idea if the tasks actually had to wait for a while.

leonoel 2024-03-08T19:42:08.334679Z

> So essentially sp using expensive calls in m/sp requires you to wrap m/? around it so it can properly park the thread or give the task to another thread? yes, basically all operators expect synchronous calls to be fast. Not fast means asynchronous, so be explicit and use via. Typical pattern :

(m/sp
  (m/? (m/via m/blk (io-bound-computation)))
  (m/? (m/via m/cpu (cpu-bound-computation))))

leonoel 2024-03-08T19:45:57.133359Z

(m/via m/blk
  (io-bound-computation)
  (m/? (m/via m/cpu (cpu-bound-computation))))
this works too, but here an OS thread is wasted waiting for the cpu task to complete

Dallas Surewood 2024-03-08T19:52:41.772329Z

So a task in m/via will never be given to another thread?

Dallas Surewood 2024-03-08T19:53:27.449899Z

In that case, is there a reason one would use (m/via m/blk) over just regular m/sp tasks? I would imagine when it comes to waiting for a task to finish, it makes more sense to park the thread than it does to have a discrete OS thread.

leonoel 2024-03-08T19:57:17.682899Z

The purpose of (m/via m/blk) is to interop with blocking IO. If you need to wait for a task to finish, use m/sp so you don't waste an OS thread doing so.

Dallas Surewood 2024-03-08T20:22:03.126839Z

Isn't blocking IO exactly the kind of thing I never want to waste an OS thread for? What's an example of a blocking IO I'd want to block a whole OS thread for? Seems like I should always use m/sp

leonoel 2024-03-08T20:22:56.233499Z

you never want it, but sometimes you have no choice

leonoel 2024-03-08T20:23:10.406349Z

some APIs have no async version

Dallas Surewood 2024-03-08T20:25:14.118849Z

Ah, so it's just for things that don't have a way to run async.

leonoel 2024-03-08T20:26:08.604179Z

yes, that's the only reason

Dallas Surewood 2024-03-08T20:40:20.199109Z

So Thread/sleep is an example of an IO function we'd need to use with (m/via m/blk). And we can't use expensive operations in m/sp or we just shouldn't? Don't both of these run the expensive computations in parallel?

(m/? (m/join vector 
       (m/sp (expensive-calculation-1))
       (m/sp (expensive-calculation-2))))

(m/? (m/join vector
       (m/via m/cpu (expensive-calculation-1))
       (m/via m/cpu (expensive-calculation-2))))

leonoel 2024-03-08T20:42:31.078569Z

no, only the latter

Dallas Surewood 2024-03-08T20:45:01.395869Z

What is the former doing? Processing one at a time?

Dallas Surewood 2024-03-08T20:45:41.244379Z

Oh yeah

leonoel 2024-03-08T20:46:17.520989Z

yes, when a sp process is spawned it runs synchronously as much as it can until it encounters a ?

Dallas Surewood 2024-03-08T20:47:04.958819Z

Okay, so m/join doesn't automatically parallelize things, it relies on them having waiting logic of some kind. Which can only be done with CPU bound stuff if it's turned into the appropriate task.

😮 1
leonoel 2024-03-08T20:50:16.636269Z

correct. in fact, m/via is the only operator touching OS threads - and consequently the only one to be unsupported in cljs

Dallas Surewood 2024-03-08T21:23:52.422089Z

Thank you so much for clarifying. That helps a lot

👍 1