This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
(defn stats-t [asset] (current-v (p/get-topic bb-quote {:topic :asset/stats :asset asset}))) (m/? (stats-t "BTCUSDT")) ;; => {:open 68053.37, ;; :index 68115.665287, ;; :value 1.2143808019240878E9, ;; :close 68123.6, ;; :volume 17897.920765, ;; :high 69209.26, ;; :low 66635.03, ;; :change 0.001, ;; :asset "BTCUSDT"} (defn volume-t [asset] (let [stats (stats-t asset)] (m/sp (let [s (m/? stats)] {:asset asset :value (:value s) :close (:close s)}))))
(defn volume-t [asset] (m/sp (let [stats (stats-t asset) s (m/? stats)] {:asset asset :value (:value s) :close (:close s)})))
The weird behavior lies in volume-t: when the let (stats-t asset) is moved inside the let inside the sp block .. then it stops working.
Stats-t is depending on a websocket connection that gets open. When it is created inside the sp block .. then the resource management of the websocket connection stops working.
But if it awaits on a task that is created inside the sp block .. then it something goes wrong with the resource management of missionary.
defn subscribing-unsubscribing-quote-flow [{:keys [websocket lock subscriptions] :as this} sub] (let [topic (format-topic-sub sub) msg-in (p/msg-in-flow websocket) topic-data-f (topic-data-flow msg-in topic) topic-f (topic-transformed-flow topic-data-f sub) conn-f (p/current-connection websocket)] (util/cont (m/ap (debug "get-quote will start a new subscription..") (let [conn (m/?> conn-f) _ (info "quote subscriber new connection: " conn)] (m/amb "listening to data") (m/? (s/subscription-start! conn topic)) (try (m/amb (m/?> topic-f)) (catch Cancelled _ (do (debug "get-quote will stop an existing subscription..") (m/? (m/compel (s/subscription-stop! conn topic))) (debug "get-quote has unsubscribed. now removing from atom..") (m/holding lock (swap! subscriptions dissoc sub)))))))))) (defrecord bybit-category-feed [opts websocket subscriptions lock] p/subscription-topic (get-topic [this sub] (or (get @subscriptions sub) (m/holding lock (let [qs (subscribing-unsubscribing-quote-flow this sub)] (swap! subscriptions assoc sub qs) qs)))))
(defn take-first-non-nil [f] ; flows dont implement deref (m/eduction (remove nil?) (take 1) f)) (defn current-v [f] (m/reduce (fn [_r v] ;(println "current-v: " v) v) nil (take-first-non-nil f)))
What I try to accomplish is: I want to get a flow that returns a flow of messages for a topic. The messages come in via a websocket. But before the messages for that topic come in I have to subscribe the topic on the websocket. I also have to subscribe for the topic if the websocket disconnects.
So I have a flow of connections. Then a flow for a topic that uses the connection flow. And my analysis task basically gets the topic-flow and returns the first topic result.