Hi, I can't seem to read off a web socket using deferred/loop or consume. Web socket seems to stop buffering new values or closes the socket? When it stops buffering the connection is still opened, not drained and has a pending take. Using aleph 0.4.6 and manifold 0.1.8 I've tried the latest releases of these as well without success. Namespace if you want to try it out:
(ns somenamespace
(:gen-class)
(:require
[aleph.http :as http]
[manifold.stream :as s]
[manifold.deferred :as d]
[cheshire.core :refer [encode decode]]))
Subscribe message and connect functions:
(defn submsg [topic symbol & {:keys [encoded?] :or {encoded? true}}]
"subscribe message"
(let [ msg {"topic" (if (keyword? topic) (name topic) topic)
"event" "sub"
"params" {"binary" false
"symbol" symbol}}]
(if encoded? (encode msg) msg)
))
(defn on-connect [sock]
(println "Connected.")
sock)
(defn on-close [sock]
(s/on-closed sock #(println "ws-closed."))
sock)
(defn ws-connect [uri]
"Connect to web socket"
(let [options {:insecure? false
:max-frame-payload (* 10 65536)
:max-frame-size (* 10 1048567)}
sock @(http/websocket-client uri options)]
(-> sock
on-connect
on-close)
sock))
At the repl connect and subscribe with:
(def ws (ws-connect ""))
(s/put! ws (submsg :trade "BTCUSDT"))
Then repeatedly evaluating the following from the repl works as expected, the buffer goes to zero, new data keeps arriving, keeps printing:
(do (prn "buffer-size: " (:buffer-size (:source (s/description ws))))
(s/take! ws ::Closed))
However loop seems to stop buffering new values after printing the values in socket source buffer:
(d/loop []
(-> (s/take! ws ::Closed)
(d/chain
#(do (println (str "buffer-size AfterTake: " (:buffer-size (:source (s/description ws)))))
(println (str %))
(if-not (= ::Closed %)
(d/recur))))
(d/catch
(fn [ex]
(println (str "ERROR: " ex))
(s/close! ws)))))
and this closes the socket after printing one value:
(s/consume-async #((prn 'message! (str %)) true) ws)
Use these to view ws state and close socket:
(s/description ws)
(s/close! ws)
I've tried different library versions, consume, consume-async, frame-size, frame payload and different take! loop recur implementations. No idea what else to do? Any help would be very much appreciated!