Fork me on GitHub
#missionary
<
2022-03-10
>
ribelo00:03:24

(mi/? (mi/reduce conj [] (mi/ap (try (mi/?< (mi/ap (mi/amb> 1 (throw (ex-info "foo" {})) 2 (throw (ex-info "bar" {})) 3)))
                                     (catch Exception e :err)))))
;; => [1 :err]
should the flow be interrupted or is this a bug?

leonoel09:03:04

this is the expected result, the outer ap is indeed interrupted after ?< but nothing is catching the interruption so the continuation is run to the end

adamfrey18:03:24

Hi. I'm hoping to get some help applying missionary to a problem I'm trying to solve My goal: • job requests (with a given id) initiated by humans, inserted into a queue • jobs are reactively picked off of the queue and processed with limited concurrency • at any point a cancellation request can be submitted for a job (by id) which should cancel that job if running or else remove it from the queue I understand the basics of tasks and flows and I've implemented a feature using ap, ?=, sem and holding and reduce but it's not obvious to me if I should be using m/reactor, m/watch, m/rdv or following some examples here https://github.com/leonoel/missionary/wiki/Creating-flows

💯 1
leonoel17:03:58

(require '[missionary.core :as m])
(import 'missionary.Cancelled)

;; mailbox receiving user commands, expects a pair [id task] or [id nil] to cancel task previously sent
(def send! (m/mbx))

;; no more than 5 concurrent tasks
(def limit (m/sem 5))

;; task runner, processes commands forever until cancelled, terminates with vector of pairs [id result],
;; result can be ::cancelled if the task was cancelled mid-flight, ::skipped if cancelled before starting
(def stop!
  ((m/reduce conj []
     (m/ap
       (try
         (let [[id >e] (m/?= (m/group-by first (m/ap (m/? (m/?> (m/seed (repeat send!)))))))]
           [id (m/? (m/reduce (comp reduced {}) nil
                      (m/ap (when-some [t (second (m/?< >e))]
                              (try (m/holding limit
                                     (try (m/? t) (catch Cancelled _ ::cancelled)))
                                   (catch Cancelled _ ::skipped))))))])
         (catch Cancelled _ (m/amb>)))))
   prn prn))

;; send user commands
(do
  (send! [0 (m/sleep 1000 ::ok)])
  (send! [1 (m/sleep 1000 ::ok)])
  (send! [1 nil])
  (send! [2 (m/sleep 1000 ::ok)])
  (send! [3 (m/sleep 1000 ::ok)])
  (send! [4 (m/sleep 1000 ::ok)])
  (send! [5 (m/sleep 1000 ::ok)])
  (send! [6 (m/sleep 1000 ::ok)])
  (send! [7 (m/sleep 1000 ::ok)])
  (send! [7 nil])
  (stop!))

#_[[1 :user/cancelled]
   [7 :user/skipped]
   [0 :user/ok]
   [2 :user/ok]
   [4 :user/ok]
   [3 :user/ok]
   [5 :user/ok]
   [6 :user/ok]]

❤️ 1
adamfrey21:03:56

Thank you so much, @U053XQP4S 🙂