Fork me on GitHub
#aleph
<
2022-07-06
>
Javier Pereira16:07:59

Hi, I can't seem to read off a web socket using deferred/loop or consume. Web socket seems to stop buffering new values or closes the socket? When it stops buffering the connection is still opened, not drained and has a pending take. Using aleph 0.4.6 and manifold 0.1.8 I've tried the latest releases of these as well without success. Namespace if you want to try it out:

(ns somenamespace
  (:gen-class)
  (:require
   [aleph.http :as http]
   [manifold.stream :as s]
   [manifold.deferred :as d]
   [cheshire.core :refer [encode decode]]))
Subscribe message and connect functions:
(defn submsg [topic symbol & {:keys [encoded?] :or {encoded? true}}]
  "subscribe message"
  (let [ msg {"topic" (if (keyword? topic) (name topic) topic)
              "event" "sub"
              "params" {"binary" false
                        "symbol" symbol}}]
    (if encoded? (encode msg) msg)
    ))

(defn on-connect [sock]
  (println "Connected.")
  sock)

(defn on-close [sock]
  (s/on-closed sock #(println "ws-closed."))
  sock)

(defn ws-connect [uri]
  "Connect to web socket"
  (let [options {:insecure? false
                 :max-frame-payload (* 10 65536)
                 :max-frame-size (* 10 1048567)}
        sock @(http/websocket-client uri options)]
    (-> sock
        on-connect
        on-close)
    sock))
At the repl connect and subscribe with:
(def ws (ws-connect ""))
(s/put! ws (submsg :trade "BTCUSDT"))
Then repeatedly evaluating the following from the repl works as expected, the buffer goes to zero, new data keeps arriving, keeps printing:
(do (prn "buffer-size: " (:buffer-size (:source (s/description ws))))
      (s/take! ws ::Closed))
However loop seems to stop buffering new values after printing the values in socket source buffer:
(d/loop []
    (-> (s/take! ws ::Closed)
        (d/chain
         #(do (println (str "buffer-size AfterTake: " (:buffer-size (:source (s/description ws)))))
              (println (str %))
              (if-not (= ::Closed %)
                (d/recur))))

        (d/catch
         (fn [ex]
           (println (str "ERROR: " ex))
           (s/close! ws)))))
and this closes the socket after printing one value:
(s/consume-async #((prn 'message! (str %)) true) ws)
Use these to view ws state and close socket:
(s/description ws)
(s/close! ws)
I've tried different library versions, consume, consume-async, frame-size, frame payload and different take! loop recur implementations. No idea what else to do? Any help would be very much appreciated!