Hello there, I'm trying to connect to gateio order_book websocket via aleph here is the document for the server : https://www.gate.mt/docs/developers/apiv4/ws/en/#limited-level-full-order-book-snapshot Here is the client side code i wrote
(:require
[clojure.pprint :as pp]
[cheshire.core :as cheshire]
[manifold.stream :as s]
[manifold.deferred :as d]
[aleph.http :as http]
[aleph.http.websocket.common :as ws-common]
[aleph.netty :as a.netty])
(def sample-url " ")
(def sample-event (cheshire/generate-string {:time (int (/ (System/currentTimeMillis) 1000))
:channel "spot.order_book"
:event "subscribe"
:payload ["BTC_USDT", "5", "100ms"]}))
@(s/put! @sample-socket sample-event)
(defn sample-process [message]
(Thread/sleep 10000)
(pp/pprint message))
(defn my-consume [f stream]
(d/loop []
(d/chain (s/take! stream ::drained)
;; if we got a message, run it through `f`
(fn [msg]
(if (identical? ::drained msg)
::drained
(f msg)))
;; wait for the result from `f` to be realized, and
;; recur, unless the stream is already drained
(fn [result]
(when-not (identical? ::drained result)
(d/recur))))))
(my-consume sample-process @sample-socket)
the server side is sending updates every 100 ms and each message has a timestamp
in my consumer i wait for 10 second before each message is processed
i expected to see a gap of 10 seconds because of the wait
because the buffer size is set to 0 and there is no server side buffering according to the documentation
but the consumer gets delayed behind indefinitely
can anyone help me find where the message is being buffered and how can i control the size of it?
Thanks to all in advance.BTW i'm using [aleph "0.7.1"]
another thing that makes me wonder is that s/close! doesn't work and return when i use Thread/sleep in the consumption function but if there is no sleep the s/close! works perfectly fine!
You need something like a dropping buffer
The websocket is going to continue to produce messages even while your consumer is paused for 10 seconds, and those just get queued up
Thanks for your response. I thought setting buffer-size to 0 will make this a dropping buffer. where is this buffer that's queuing up the messages? I really need to get the latest segment possible i tried killing the socket when the message gets too old but there is a chance that close! gets blocked indefinitely.