I have noticed a weird requirement when one is composing tasks:
(defn stats-t [asset] (current-v (p/get-topic bb-quote {:topic :asset/stats :asset asset}))) (m/? (stats-t "BTCUSDT")) ;; => {:open 68053.37, ;; :index 68115.665287, ;; :value 1.2143808019240878E9, ;; :close 68123.6, ;; :volume 17897.920765, ;; :high 69209.26, ;; :low 66635.03, ;; :change 0.001, ;; :asset "BTCUSDT"} (defn volume-t [asset] (let [stats (stats-t asset)] (m/sp (let [s (m/? stats)] {:asset asset :value (:value s) :close (:close s)}))))
So stats-t is a fn that returns a task. I can m/? On this task. All good so far.
(defn volume-t [asset] (m/sp (let [stats (stats-t asset) s (m/? stats)] {:asset asset :value (:value s) :close (:close s)})))
The weird behavior lies in volume-t: when the let (stats-t asset) is moved inside the let inside the sp block .. then it stops working.
Stats-t is depending on a websocket connection that gets open. When it is created inside the sp block .. then the resource management of the websocket connection stops working.
It is weird because the m/? Always waits on a Task.
But if it awaits on a task that is created inside the sp block .. then it something goes wrong with the resource management of missionary.
One has to keep in mind that it is an "anti-pattern" for missionary sp code.
I think the weird behavior you observe is due to m/holding being called from an m/sp block. m/holding macroexpands to m/?, which is illegal in this case because it's not within the lexical scope of m/sp.
This is a known issue, the correct behavior is to block the current thread on m/?, as if called outside of m/sp. As a workaround you can use ReentrantLock
stats-t is likely impure, otherwise it could be safely moved inside m/sp
what are p/get-topic and current-v ?
defn subscribing-unsubscribing-quote-flow [{:keys [websocket lock subscriptions] :as this} sub] (let [topic (format-topic-sub sub) msg-in (p/msg-in-flow websocket) topic-data-f (topic-data-flow msg-in topic) topic-f (topic-transformed-flow topic-data-f sub) conn-f (p/current-connection websocket)] (util/cont (m/ap (debug "get-quote will start a new subscription..") (let [conn (m/?> conn-f) _ (info "quote subscriber new connection: " conn)] (m/amb "listening to data") (m/? (s/subscription-start! conn topic)) (try (m/amb (m/?> topic-f)) (catch Cancelled _ (do (debug "get-quote will stop an existing subscription..") (m/? (m/compel (s/subscription-stop! conn topic))) (debug "get-quote has unsubscribed. now removing from atom..") (m/holding lock (swap! subscriptions dissoc sub)))))))))) (defrecord bybit-category-feed [opts websocket subscriptions lock] p/subscription-topic (get-topic [this sub] (or (get @subscriptions sub) (m/holding lock (let [qs (subscribing-unsubscribing-quote-flow this sub)] (swap! subscriptions assoc sub qs) qs)))))
(defn take-first-non-nil [f] ; flows dont implement deref (m/eduction (remove nil?) (take 1) f)) (defn current-v [f] (m/reduce (fn [_r v] ;(println "current-v: " v) v) nil (take-first-non-nil f)))
What I try to accomplish is: I want to get a flow that returns a flow of messages for a topic. The messages come in via a websocket. But before the messages for that topic come in I have to subscribe the topic on the websocket. I also have to subscribe for the topic if the websocket disconnects.
So I have a flow of connections. Then a flow for a topic that uses the connection flow. And my analysis task basically gets the topic-flow and returns the first topic result.