This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2020-10-24
Channels
- # aleph (1)
- # babashka (2)
- # beginners (25)
- # calva (5)
- # cider (8)
- # cljdoc (4)
- # clojure (81)
- # clojure-europe (41)
- # clojure-spec (11)
- # clojurescript (7)
- # conjure (1)
- # data-science (1)
- # datomic (25)
- # defnpodcast (3)
- # events (2)
- # figwheel-main (8)
- # fulcro (5)
- # helix (4)
- # hugsql (1)
- # java (2)
- # off-topic (35)
- # onyx (18)
- # pathom (8)
- # rdf (5)
- # re-frame (9)
- # reagent (3)
- # reitit (1)
- # shadow-cljs (39)
- # tools-deps (53)
- # xtdb (23)
Code review wanted for this automatic/reconnecting websocket client. The idea is basically to have something which acts exactly as the aleph websocket client except it uses the source/sink streams you provide it, and it reconnects to those streams when the connection is closed unintentionally. I'm mainly wondering if the stream splicing/connection logic can be better.
;;;goal: pass input/output streams which connect to a websocket client duplex
;;;stream. when the inner websocket stream is closed unintentionally by server,
;;;make a new one and reconnect the input/output streams. close the websocket
;;;stream when the output stream is closed.
(require '[manifold.stream :as ms]
'[manifold.deferred :as md]
'[alep.http :as http]
'[taoensso.timbre :as log]
'[clojure.data.json :as json])
(defn ws-conn [url]
(http/websocket-client url
{:max-frame-payload 1e7 :max-frame-size 1e7}))
(defn reconnecting-websocket-stream [{::keys [url on-connect
in-stream
out-stream]
:or {in-stream (ms/stream)
out-stream (ms/stream)}}]
(let [in* (ms/stream* {:permanent? true :buffer-size 1000})
_ (ms/connect in-stream in*)
s (ms/splice in-stream out-stream)
renew-connection
(fn this
([] (this 0 (quot (System/currentTimeMillis) 1000)))
([n last-reset-second]
(md/chain (ws-conn url)
(fn [conn]
(log/debug "Starting connection. Restart count:" n)
(ms/on-closed out-stream (fn [] (.close conn)))
(ms/connect in* conn)
(ms/connect conn out-stream {:downstream? false})
(ms/on-closed conn
(fn []
(log/info "connection closed")
(when (not (ms/closed? out-stream))
(when (= last-reset-second (quot (System/currentTimeMillis) 1000))
(log/info "attempted to reset more than once per second, waiting")
(Thread/sleep 1000))
(log/info "restarting after connectiong closed closed")
(this (inc n) (quot (System/currentTimeMillis) 1000)))))
(when on-connect (on-connect s))))))]
(renew-connection)
s))
usage:
(def subscribe-msg {:type :subscribe
:product_ids ["BTC-USD"]
:channels [:heartbeat :full]})
(def in (ms/stream))
(def out (ms/stream))
(reconnecting-websocket-stream
{::url ""
::on-connect (fn [s] (ms/put! s (json/write-str subscribe-msg)))
::in-stream in
::out-stream out})
(ms/consume
(fn [msg]
(log/sometimes 0.002
(log/info msg)))
out)
;;; to close
(ms/close! out)