I have a missionary flow that needs to do run a missionary task when the flow gets Cancelled. But this seems not to be possible, as each time this flows gets cancelled I get exceptions on the m/?.
This is the code:
ns quanta.market.quote.current (:require [taoensso.timbre :as timbre :refer [debug info warn error]] [missionary.core :as m] [quanta.market.protocol :as p] [quanta.market.util :as util]) (:import [missionary Cancelled])) (defonce subscriptions (atom {})) (def lock (m/sem)) (defn subscribing-unsubscribing-quote-flow [qm sub] (info "get-quote will start a new subscription..") (let [q (p/last-trade-flow qm sub)] (m/? (p/subscribe-last-trade! qm sub)) (util/cont (m/ap (try (m/amb (m/?> q)) (catch Cancelled _ (do (info "get-quote will stop an existing subscription..") (m/? (p/unsubscribe-last-trade! qm sub)) (info "get-quote has unsubscribed. now removing from atom..") (m/holding lock (swap! subscriptions dissoc sub))))))))) (defn get-quote [qm sub] (or (get @subscriptions sub) (m/holding lock (let [qs (subscribing-unsubscribing-quote-flow qm sub)] (swap! subscriptions assoc sub qs) qs))))
Or a link: https://github.com/clojure-quant/quanta-market/blob/main/src/quanta/market/quote/current.clj
I did make two tests:
1. Run subscribe as a standalone task
https://github.com/clojure-quant/quanta-market/blob/main/demo/src/demo/dev/qm.clj
The standalone task works.
2. Trying to use the flow I posted above which fails: https://github.com/clojure-quant/quanta-market/blob/main/demo/src/demo/qm.clj
The issue is that I am using m/? Inside a (catch Cancelled) Block of a flow that us getting cancelled.
I believe that this is a valid scenario: a flow that does resource cleanup on cancelation. And that the cleanup is not 100% synchronous can also happen I would think.
Any ideas?
isn't the exception what you want to signal to you that the flow was canceled?
I found the solution! If I want to m/? Tasks in a cancelled flow . Then I have to wrap (m/compel task. Now it works
@v1nc3ntpull1ng the cancel exceptions is what I need to Catch, the problem was to "await" a task that needs to run on cancelation of the flow.
Oh cool!