Fork me on GitHub
#missionary
<
2024-07-28
>
awb9922:07:46

I have noticed a weird requirement when one is composing tasks:

awb9922:07:27

(defn stats-t [asset] (current-v (p/get-topic bb-quote {:topic :asset/stats :asset asset}))) (m/? (stats-t "BTCUSDT")) ;; => {:open 68053.37, ;; :index 68115.665287, ;; :value 1.2143808019240878E9, ;; :close 68123.6, ;; :volume 17897.920765, ;; :high 69209.26, ;; :low 66635.03, ;; :change 0.001, ;; :asset "BTCUSDT"} (defn volume-t [asset] (let [stats (stats-t asset)] (m/sp (let [s (m/? stats)] {:asset asset :value (:value s) :close (:close s)}))))

awb9922:07:32

So stats-t is a fn that returns a task. I can m/? On this task. All good so far.

awb9922:07:36

(defn volume-t [asset]     (m/sp (let [stats (stats-t asset) s (m/? stats)]        {:asset asset         :value (:value s)         :close (:close s)})))

awb9922:07:40

The weird behavior lies in volume-t: when the let (stats-t asset) is moved inside the let inside the sp block .. then it stops working.

awb9922:07:46

Stats-t is depending on a websocket connection that gets open. When it is created inside the sp block .. then the resource management of the websocket connection stops working.

awb9922:07:55

It is weird because the m/? Always waits on a Task.

awb9922:07:36

But if it awaits on a task that is created inside the sp block .. then it something goes wrong with the resource management of missionary.

awb9922:07:57

One has to keep in mind that it is an "anti-pattern" for missionary sp code.

leonoel08:07:22

stats-t is likely impure, otherwise it could be safely moved inside m/sp

leonoel08:07:17

what are p/get-topic and current-v ?

awb9921:07:33

defn subscribing-unsubscribing-quote-flow [{:keys [websocket lock subscriptions] :as this} sub] (let [topic (format-topic-sub sub) msg-in (p/msg-in-flow websocket) topic-data-f (topic-data-flow msg-in topic) topic-f (topic-transformed-flow topic-data-f sub) conn-f (p/current-connection websocket)] (util/cont (m/ap (debug "get-quote will start a new subscription..") (let [conn (m/?> conn-f) _ (info "quote subscriber new connection: " conn)] (m/amb "listening to data") (m/? (s/subscription-start! conn topic)) (try (m/amb (m/?> topic-f)) (catch Cancelled _ (do (debug "get-quote will stop an existing subscription..") (m/? (m/compel (s/subscription-stop! conn topic))) (debug "get-quote has unsubscribed. now removing from atom..") (m/holding lock (swap! subscriptions dissoc sub)))))))))) (defrecord bybit-category-feed [opts websocket subscriptions lock] p/subscription-topic (get-topic [this sub] (or (get @subscriptions sub) (m/holding lock (let [qs (subscribing-unsubscribing-quote-flow this sub)] (swap! subscriptions assoc sub qs) qs)))))

awb9921:07:22

(defn take-first-non-nil [f] ; flows dont implement deref (m/eduction (remove nil?) (take 1) f)) (defn current-v [f] (m/reduce (fn [_r v] ;(println "current-v: " v) v) nil (take-first-non-nil f)))

awb9921:07:25

What I try to accomplish is: I want to get a flow that returns a flow of messages for a topic. The messages come in via a websocket. But before the messages for that topic come in I have to subscribe the topic on the websocket. I also have to subscribe for the topic if the websocket disconnects.

awb9921:07:13

So I have a flow of connections. Then a flow for a topic that uses the connection flow. And my analysis task basically gets the topic-flow and returns the first topic result.

leonoel15:07:14

I think the weird behavior you observe is due to m/holding being called from an m/sp block. m/holding macroexpands to m/?, which is illegal in this case because it's not within the lexical scope of m/sp.

leonoel16:07:56

This is a known issue, the correct behavior is to block the current thread on m/?, as if called outside of m/sp. As a workaround you can use ReentrantLock