Hi, I'm trying to model a task that consumes two discrete flows concurrently and reacts differently depending on which flow the value is coming from. Since I can't think of a way to do this in missionary, I created an example using core.async below.
The task's purpose is to keep a tally of the "cost" while work is being done. I modeled the flows as core.async channels. The cost increases by 10 when a job is posted on the job channel and by 1 when the timer ticks. Jobs are posted to the job channel every 0.5 seconds, while the timer ticks every 1 second.
The task terminates when the job channel is closed (after 10 posts).
Do you have any suggestions on what primitives I could use in Missionary to model the alts! operation, where it not only retrieves a value from the next available channel (i.e. flow) but also indicates which channel the value came from, so that the task can act accordingly? It is the latter I can't figure out how to model in missionary.
Thanks in advance!
(ns starter.coreasync
(:require [clojure.core.async :refer [alts! chan close! go >! timeout]]))
(def job-channel (chan 1))
(def timer-channel (chan 1))
(defn job-executor [interval-ms lifespan]
(go-loop [lifespan lifespan]
(if (> lifespan 0)
(do
(alts! [(timeout interval-ms)])
(>! job-channel :job)
(recur (dec lifespan)))
(close! job-channel))))
(defn clock-ticker [interval-ms]
(js/setTimeout #(go
(>! timer-channel :tick)
(clock-ticker interval-ms))
interval-ms))
(def task
(go
(job-executor 500 10)
(clock-ticker 1000)
(loop [round 0
cost 0]
(let [[v ch] (alts! [job-channel timer-channel])
round (inc round)]
(println :round round :cost cost :val v)
(cond
(and (= v nil) (= ch job-channel))
(println :completed :rounds round :cost cost)
(= ch job-channel)
(recur round (+ cost 10))
(= ch timer-channel)
(recur round (inc cost)))))))
;; => :round 1 :cost 0 :val :job
;; => :round 2 :cost 10 :val :tick
;; => :round 3 :cost 11 :val :job
;; => :round 4 :cost 21 :val :job
;; => :round 5 :cost 31 :val :tick
;; => :round 6 :cost 32 :val :job
;; => :round 7 :cost 42 :val :job
;; => :round 8 :cost 52 :val :tick
;; => :round 9 :cost 53 :val :job
;; => :round 10 :cost 63 :val :job
;; => :round 11 :cost 73 :val :tick
;; => :round 12 :cost 74 :val :job
;; => :round 13 :cost 84 :val :job
;; => :round 14 :cost 94 :val :tick
;; => :round 15 :cost 95 :val :job
;; => :round 16 :cost 105 :val nil
;; => :completed :rounds 16 :cost 105In an ap block, (m/amb= [:job (m/?> job)] [:timer (m/?> timer)])
Thanks, this has done the trick! I have a question: what is the difference between m/?> without the par argument and m/?<? They appear as if they are equivalent when the par argument is not provided.
(ns starter.missionambeq
(:require [missionary.core :as m]))
(defn job
[interval-ms lifespan]
(m/ap
(loop [lifespan lifespan]
(if (> lifespan 0)
(do
(m/? (m/sleep interval-ms))
(m/amb :job
(recur (dec lifespan))))
nil))))
(defn timer
[interval-ms]
(m/ap
(loop []
(m/? (m/sleep interval-ms))
(m/amb :timer
(recur)))))
(def task
(m/sp (let [job> (job 500 10)
timer> (timer 1000)
choice> (m/ap (m/amb= [:job (m/?> job>)]
[:timer (m/?> timer>)]))]
(m/? (m/reduce (fn [{:keys [round cost]} [flow v]]
(println :round round :cost cost :v v)
(let [round (inc round)]
(cond
(and (= v nil) (= flow :job))
(do
(println :completed :rounds round :cost cost)
(reduced :done))
(= flow :job)
{:round round :cost (+ cost 10)}
(= flow :timer)
{:round round :cost (inc cost)})))
{:round 0 :cost 0} choice>)))))
(task (partial println :success) (partial println :failure))
;; => :round 0 :cost 0 :v :job
;; => :round 1 :cost 10 :v :timer
;; => :round 2 :cost 11 :v :job
;; => :round 3 :cost 21 :v :job
;; => :round 4 :cost 31 :v :timer
;; => :round 5 :cost 32 :v :job
;; => :round 6 :cost 42 :v :job
;; => :round 7 :cost 52 :v :timer
;; => :round 8 :cost 53 :v :job
;; => :round 9 :cost 63 :v :job
;; => :round 10 :cost 73 :v :timer
;; => :round 11 :cost 74 :v :job
;; => :round 12 :cost 84 :v :job
;; => :round 13 :cost 94 :v :timer
;; => :round 14 :cost 95 :v :job
;; => :round 15 :cost 105 :v nil
;; => :completed :rounds 16 :cost 105
;; => :success :donehttps://cljdoc.org/d/missionary/missionary/b.39/doc/readme/tutorials/hello-flow