missionary

chaos 2024-05-14T19:40:29.655289Z

Hi, I'm trying to model a task that consumes two discrete flows concurrently and reacts differently depending on which flow the value is coming from. Since I can't think of a way to do this in missionary, I created an example using core.async below. The task's purpose is to keep a tally of the "cost" while work is being done. I modeled the flows as core.async channels. The cost increases by 10 when a job is posted on the job channel and by 1 when the timer ticks. Jobs are posted to the job channel every 0.5 seconds, while the timer ticks every 1 second. The task terminates when the job channel is closed (after 10 posts). Do you have any suggestions on what primitives I could use in Missionary to model the alts! operation, where it not only retrieves a value from the next available channel (i.e. flow) but also indicates which channel the value came from, so that the task can act accordingly? It is the latter I can't figure out how to model in missionary. Thanks in advance!

(ns starter.coreasync
  (:require [clojure.core.async :refer [alts! chan close! go >! timeout]]))

(def job-channel (chan 1))
(def timer-channel (chan 1))

(defn job-executor [interval-ms lifespan]
  (go-loop [lifespan lifespan]
    (if (> lifespan 0)
      (do
        (alts! [(timeout interval-ms)])
        (>! job-channel :job)
        (recur (dec lifespan)))

      (close! job-channel))))

(defn clock-ticker [interval-ms]
  (js/setTimeout #(go
                    (>! timer-channel :tick)
                    (clock-ticker interval-ms))
                 interval-ms))

(def task
  (go
    (job-executor 500 10)
    (clock-ticker 1000)
    (loop [round 0
           cost 0]
      (let [[v ch] (alts! [job-channel timer-channel])
            round (inc round)]
        (println :round round :cost cost :val v)
        (cond
          (and (= v nil) (= ch job-channel))
          (println :completed :rounds round :cost cost)

          (= ch job-channel)
          (recur round (+ cost 10))

          (= ch timer-channel)
          (recur round (inc cost)))))))

;; => :round 1 :cost 0 :val :job
;; => :round 2 :cost 10 :val :tick
;; => :round 3 :cost 11 :val :job
;; => :round 4 :cost 21 :val :job
;; => :round 5 :cost 31 :val :tick
;; => :round 6 :cost 32 :val :job
;; => :round 7 :cost 42 :val :job
;; => :round 8 :cost 52 :val :tick
;; => :round 9 :cost 53 :val :job
;; => :round 10 :cost 63 :val :job
;; => :round 11 :cost 73 :val :tick
;; => :round 12 :cost 74 :val :job
;; => :round 13 :cost 84 :val :job
;; => :round 14 :cost 94 :val :tick
;; => :round 15 :cost 95 :val :job
;; => :round 16 :cost 105 :val nil
;; => :completed :rounds 16 :cost 105

xificurC 2024-05-14T20:21:08.210559Z

In an ap block, (m/amb= [:job (m/?> job)] [:timer (m/?> timer)])

chaos 2024-05-14T21:45:53.159319Z

Thanks, this has done the trick! I have a question: what is the difference between m/?> without the par argument and m/?<? They appear as if they are equivalent when the par argument is not provided.

(ns starter.missionambeq
  (:require [missionary.core :as m]))

(defn job
  [interval-ms lifespan]
  (m/ap
    (loop [lifespan lifespan]
      (if (> lifespan 0)
        (do
          (m/? (m/sleep interval-ms))
          (m/amb :job
           (recur (dec lifespan))))

        nil))))

(defn timer
  [interval-ms]
  (m/ap
    (loop []
      (m/? (m/sleep interval-ms))
      (m/amb :timer
       (recur)))))

(def task
  (m/sp (let [job> (job 500 10)
              timer> (timer 1000)
              choice> (m/ap (m/amb= [:job (m/?> job>)]
                                    [:timer (m/?> timer>)]))]

          (m/? (m/reduce (fn [{:keys [round cost]} [flow v]]
                           (println :round round :cost cost :v v)
                           (let [round (inc round)]
                             (cond
                               (and (= v nil) (= flow :job))
                               (do
                                 (println :completed :rounds round :cost cost)
                                 (reduced :done))

                               (= flow :job)
                               {:round round :cost (+ cost 10)}

                               (= flow :timer)
                               {:round round :cost (inc cost)})))
                         
                         {:round 0 :cost 0} choice>)))))

(task (partial println :success) (partial println :failure))
;; => :round 0 :cost 0 :v :job
;; => :round 1 :cost 10 :v :timer
;; => :round 2 :cost 11 :v :job
;; => :round 3 :cost 21 :v :job
;; => :round 4 :cost 31 :v :timer
;; => :round 5 :cost 32 :v :job
;; => :round 6 :cost 42 :v :job
;; => :round 7 :cost 52 :v :timer
;; => :round 8 :cost 53 :v :job
;; => :round 9 :cost 63 :v :job
;; => :round 10 :cost 73 :v :timer
;; => :round 11 :cost 74 :v :job
;; => :round 12 :cost 84 :v :job
;; => :round 13 :cost 94 :v :timer
;; => :round 14 :cost 95 :v :job
;; => :round 15 :cost 105 :v nil
;; => :completed :rounds 16 :cost 105
;; => :success :done