aleph

mh.meraji 2024-12-15T15:08:14.593359Z

Hello there, I'm trying to connect to gateio order_book websocket via aleph here is the document for the server : https://www.gate.mt/docs/developers/apiv4/ws/en/#limited-level-full-order-book-snapshot Here is the client side code i wrote

(:require
   [clojure.pprint                                   :as pp]
   [cheshire.core                                    :as cheshire]
   [manifold.stream                                  :as s]
   [manifold.deferred                                :as d]
   [aleph.http                                       :as http]
   [aleph.http.websocket.common                      :as ws-common]
   [aleph.netty :as a.netty])


(def sample-url "")
(def sample-event (cheshire/generate-string {:time    (int (/ (System/currentTimeMillis) 1000))
                                             :channel "spot.order_book"
                                             :event   "subscribe"
                                             :payload ["BTC_USDT", "5", "100ms"]}))

@(s/put! @sample-socket sample-event)

(defn sample-process [message]
    (Thread/sleep 10000)
    (pp/pprint message))

(defn my-consume [f stream]
    (d/loop []
      (d/chain (s/take! stream ::drained)

               ;; if we got a message, run it through `f`
               (fn [msg]
                 (if (identical? ::drained msg)
                   ::drained
                   (f msg)))

               ;; wait for the result from `f` to be realized, and
               ;; recur, unless the stream is already drained
               (fn [result]
                 (when-not (identical? ::drained result)
                   (d/recur))))))

(my-consume sample-process @sample-socket) 
the server side is sending updates every 100 ms and each message has a timestamp in my consumer i wait for 10 second before each message is processed i expected to see a gap of 10 seconds because of the wait because the buffer size is set to 0 and there is no server side buffering according to the documentation but the consumer gets delayed behind indefinitely can anyone help me find where the message is being buffered and how can i control the size of it? Thanks to all in advance.

mh.meraji 2024-12-15T15:10:26.331029Z

BTW i'm using [aleph "0.7.1"]

mh.meraji 2024-12-15T15:20:03.658829Z

another thing that makes me wonder is that s/close! doesn't work and return when i use Thread/sleep in the consumption function but if there is no sleep the s/close! works perfectly fine!

2024-12-15T17:27:23.157219Z

You need something like a dropping buffer

2024-12-15T17:28:15.687499Z

The websocket is going to continue to produce messages even while your consumer is paused for 10 seconds, and those just get queued up

mh.meraji 2024-12-16T08:34:17.005069Z

Thanks for your response. I thought setting buffer-size to 0 will make this a dropping buffer. where is this buffer that's queuing up the messages? I really need to get the latest segment possible i tried killing the socket when the message gets too old but there is a chance that close! gets blocked indefinitely.