This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2022-07-06
Channels
- # aleph (1)
- # announcements (3)
- # asami (32)
- # aws (12)
- # babashka (6)
- # beginners (43)
- # calva (36)
- # cider (3)
- # clj-kondo (3)
- # cljs-dev (2)
- # clojars (6)
- # clojure (66)
- # clojure-europe (14)
- # clojure-uk (2)
- # clojurescript (12)
- # conjure (1)
- # core-async (27)
- # cursive (17)
- # data-science (9)
- # datahike (1)
- # datomic (28)
- # emacs (34)
- # events (1)
- # girouette (3)
- # jobs (1)
- # klipse (4)
- # lsp (26)
- # malli (5)
- # off-topic (38)
- # portal (1)
- # releases (1)
- # shadow-cljs (72)
- # sql (7)
- # tools-deps (5)
- # vim (9)
- # xtdb (18)
Hi, I can't seem to read off a web socket using deferred/loop or consume. Web socket seems to stop buffering new values or closes the socket? When it stops buffering the connection is still opened, not drained and has a pending take. Using aleph 0.4.6 and manifold 0.1.8 I've tried the latest releases of these as well without success. Namespace if you want to try it out:
(ns somenamespace
(:gen-class)
(:require
[aleph.http :as http]
[manifold.stream :as s]
[manifold.deferred :as d]
[cheshire.core :refer [encode decode]]))
Subscribe message and connect functions:
(defn submsg [topic symbol & {:keys [encoded?] :or {encoded? true}}]
"subscribe message"
(let [ msg {"topic" (if (keyword? topic) (name topic) topic)
"event" "sub"
"params" {"binary" false
"symbol" symbol}}]
(if encoded? (encode msg) msg)
))
(defn on-connect [sock]
(println "Connected.")
sock)
(defn on-close [sock]
(s/on-closed sock #(println "ws-closed."))
sock)
(defn ws-connect [uri]
"Connect to web socket"
(let [options {:insecure? false
:max-frame-payload (* 10 65536)
:max-frame-size (* 10 1048567)}
sock @(http/websocket-client uri options)]
(-> sock
on-connect
on-close)
sock))
At the repl connect and subscribe with:
(def ws (ws-connect ""))
(s/put! ws (submsg :trade "BTCUSDT"))
Then repeatedly evaluating the following from the repl works as expected, the buffer goes to zero, new data keeps arriving, keeps printing:
(do (prn "buffer-size: " (:buffer-size (:source (s/description ws))))
(s/take! ws ::Closed))
However loop seems to stop buffering new values after printing the values in socket source buffer:
(d/loop []
(-> (s/take! ws ::Closed)
(d/chain
#(do (println (str "buffer-size AfterTake: " (:buffer-size (:source (s/description ws)))))
(println (str %))
(if-not (= ::Closed %)
(d/recur))))
(d/catch
(fn [ex]
(println (str "ERROR: " ex))
(s/close! ws)))))
and this closes the socket after printing one value:
(s/consume-async #((prn 'message! (str %)) true) ws)
Use these to view ws state and close socket:
(s/description ws)
(s/close! ws)
I've tried different library versions, consume, consume-async, frame-size, frame payload and different take! loop recur implementations. No idea what else to do? Any help would be very much appreciated!