This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2022-03-10
Channels
- # announcements (3)
- # asami (4)
- # babashka (21)
- # beginners (97)
- # calva (32)
- # cider (4)
- # clj-kondo (7)
- # cljdoc (1)
- # clojure (70)
- # clojure-europe (27)
- # clojure-nl (10)
- # clojure-norway (18)
- # clojure-uk (8)
- # clojure-ukraine (1)
- # clojurescript (5)
- # datalevin (7)
- # docker (1)
- # emacs (3)
- # fulcro (4)
- # girouette (4)
- # graalvm (2)
- # graphql (9)
- # gratitude (3)
- # honeysql (4)
- # hoplon (3)
- # hyperfiddle (7)
- # jobs (3)
- # kaocha (31)
- # lsp (23)
- # malli (7)
- # missionary (6)
- # nextjournal (9)
- # off-topic (6)
- # pathom (13)
- # polylith (13)
- # practicalli (3)
- # remote-jobs (3)
- # reveal (7)
- # schema (1)
- # sci (23)
- # shadow-cljs (31)
- # tools-deps (62)
- # xtdb (8)
(mi/? (mi/reduce conj [] (mi/ap (try (mi/?< (mi/ap (mi/amb> 1 (throw (ex-info "foo" {})) 2 (throw (ex-info "bar" {})) 3)))
(catch Exception e :err)))))
;; => [1 :err]
should the flow be interrupted or is this a bug?this is the expected result, the outer ap
is indeed interrupted after ?<
but nothing is catching the interruption so the continuation is run to the end
Hi. I'm hoping to get some help applying missionary to a problem I'm trying to solve
My goal:
• job requests (with a given id) initiated by humans, inserted into a queue
• jobs are reactively picked off of the queue and processed with limited concurrency
• at any point a cancellation request can be submitted for a job (by id) which should cancel that job if running or else remove it from the queue
I understand the basics of tasks and flows and I've implemented a feature using ap
, ?=
, sem
and holding
and reduce
but it's not obvious to me if I should be using m/reactor
, m/watch
, m/rdv
or following some examples here https://github.com/leonoel/missionary/wiki/Creating-flows
(require '[missionary.core :as m])
(import 'missionary.Cancelled)
;; mailbox receiving user commands, expects a pair [id task] or [id nil] to cancel task previously sent
(def send! (m/mbx))
;; no more than 5 concurrent tasks
(def limit (m/sem 5))
;; task runner, processes commands forever until cancelled, terminates with vector of pairs [id result],
;; result can be ::cancelled if the task was cancelled mid-flight, ::skipped if cancelled before starting
(def stop!
((m/reduce conj []
(m/ap
(try
(let [[id >e] (m/?= (m/group-by first (m/ap (m/? (m/?> (m/seed (repeat send!)))))))]
[id (m/? (m/reduce (comp reduced {}) nil
(m/ap (when-some [t (second (m/?< >e))]
(try (m/holding limit
(try (m/? t) (catch Cancelled _ ::cancelled)))
(catch Cancelled _ ::skipped))))))])
(catch Cancelled _ (m/amb>)))))
prn prn))
;; send user commands
(do
(send! [0 (m/sleep 1000 ::ok)])
(send! [1 (m/sleep 1000 ::ok)])
(send! [1 nil])
(send! [2 (m/sleep 1000 ::ok)])
(send! [3 (m/sleep 1000 ::ok)])
(send! [4 (m/sleep 1000 ::ok)])
(send! [5 (m/sleep 1000 ::ok)])
(send! [6 (m/sleep 1000 ::ok)])
(send! [7 (m/sleep 1000 ::ok)])
(send! [7 nil])
(stop!))
#_[[1 :user/cancelled]
[7 :user/skipped]
[0 :user/ok]
[2 :user/ok]
[4 :user/ok]
[3 :user/ok]
[5 :user/ok]
[6 :user/ok]]
Thank you so much, @U053XQP4S 🙂