Fork me on GitHub
#aleph
<
2022-07-13
>
kingmob07:07:50

@lepistane You mean, a client that has just fallen silent, instead of sending a WS Close control frame? It’s usually impossible to distinguish between a client that has died vs one that is technically still active but very slow/delayed. The usual answer is to set an idle timeout, which is the idle-timeout param when starting the server. I can see a zombie client that’s effectively dead, but still sending heartbeat Ping frames could be a problem (like if a client thread responsible for sending data crashed, but a separate heartbeat thread was still going), so you may need to detect that manually. I haven’t tested it, but I think the same issue would happen if you told the server to send Pings, but forgot to set the timeout field in heartbeats, in which case the server’s Ping frames would keep the conn alive, even if it didn’t get a Pong response. Closing a stream will eventually shut down a WS connection once drained, but if you don’t want to wait, you can call aleph.http/websocket-close! You shouldn’t need to manually clear anything. Let GC take care of it if/when it gets to it. Under the hood, Netty pools and reuses a lot of resources to minimize expensive object construction.

1
kingmob08:07:22

This is your example code, right?

(defn handler [conn]
  (fn [raw-command]
    (try
      (let [cmd (->edn raw-command)]
        ;;do stuff to conn, stream/put conn type of thing
        (catch Exception e
          (timbre/error e "Exception caught in command handler")
          (stream/put! conn (->json {:status  :error
                                     :message (:cause (Throwable->map e))})))))))

(defn sub-and-serve [request]
  (let [twenty-seconds 20000
        ten-seconds 10000]
    (d/let-flow [conn (http/websocket-connection request
                                                 {:heartbeats {:send-after-idle twenty-seconds
                                                               :timeout         ten-seconds}})]
                (stream/put! conn (->json {:message "Events incoming..."}))
                (stream/consume (handler conn) conn))
    nil))
I think the do stuff is hiding something. This server basically sends endless messages to any clients that request it? If so, I can think of one possibility. The current idle state handler only checks to see if both the server and client are idle before shutting it down, but if the servers sends at least one message every 20 seconds, the conn’s never idle. So really, the question is, what’s happening on the client side? On reddit, you said: > The thing that bothers me is that when frontend disconnects/closes connection, backend continues to serve those messages to that aleph ws connection. > Frontend is disconnected manually - that’s fine and expected, but backend continues to put messages into a connection stream. How are you closing the connection on the client side? And what behavior are you seeing? Even if the idle handler isn’t being triggered, if the client ever sent a Close frame, the server should close that connection pretty quickly

lepistane08:07:05

@kingmob This is the whole code, it's very much WIP and maybe i should've been more specific what i am trying to do. I was trying to reduce complexity. I apologize

(defn ->json [edn]
  (json/generate-string edn))


(defn ->edn [json]
  (json/parse-string json true))


(defn ws-send [sock msg]
  (timbre/info "Sending message: " msg)
  (s/put! sock msg))


(defn ws-close [sock]
  (timbre/info "Closing connection...")
  (s/close! sock))


(defstate connections
  :start (atom {:clients {}
                :bots {}})
  :stop (do (when-let [conns (vals (:clients @connections))]
              (doseq [c conns]
                (ws-close c)))
            (atom {})))


(defn ws-conn
  [uri & {:keys [on-connect on-msg on-close aleph]}]
  (let [sock (http/websocket-client uri aleph)
        handle-messages (fn [sock]
                          (d/chain
                            (s/consume (fn [msg] (on-msg sock msg)) sock)
                            (fn [sock-closed] sock)))
        handle-shutdown (fn [sock]
                          (let [state (:sink (s/description sock))]
                            (on-close
                              sock {:stat (:websocket-close-code state)
                                    :desc (:websocket-close-msg state)})))]
    (d/chain sock #(doto % on-connect) handle-messages handle-shutdown)
    @sock))


(defn create-connection
  "ref: "
  [conn id {:keys [fixture] :as cmd}]
  (let [url (get (:game-server-ws config) fixture)
        json-cmd (->json cmd)]
    (when-let [connection (ws-conn url
                                   {:on-connect (fn [sock]
                                                  (timbre/info "Websocket Client Connected.")
                                                  (ws-send sock json-cmd))
                                    :on-msg (fn [sock msg]
                                              (timbre/info "Passing to client following message: " msg)
                                              (stream/put! conn msg))
                                    :on-close (fn [sock msg]
                                                (timbre/info "Websocket Client Closed." msg))})]
      (swap! connections assoc-in [:clients id] connection))))


(defn handler [conn id]
  (fn [raw-command]
    (try
      (let [cmd (->edn raw-command)]
        (if-let [existing-conn (get-in @connections [:clients id])]
          (ws-send existing-conn (->json cmd))
          (create-connection conn id cmd)))
      (catch Exception e
        (timbre/error e "Exception caught in command handler")
        (stream/put! conn (->json {:status :error
                                   :message (:cause (Throwable->map e))}))))))


(defn sub-and-serve [request]
  (let [twenty-seconds 20000
        ten-seconds 10000
        id (random-uuid)]
    (d/let-flow [conn (http/websocket-connection request {:heartbeats {:send-after-idle twenty-seconds
                                                                       :timeout ten-seconds
                                                                       :idle-timeout ten-seconds}})]
                (stream/put! conn (->json {:message "Events incoming..."}))
                (stream/consume (handler conn id) conn))
    nil))
So i am doing a very naughty thing. I am creating a middleman between frontend and actual backend. I don't want to change frontend and i don't want to change backend (that's a requirement) I want middleman to to connect to backend and wire messages to the frontend. I am using postman to simulate frontend client) The reason why i ask asking about client disconnects is because if i don't the connections atom will only grow because when i 'disconnect' using postman the middleman (code above) doesn't react. Messages from backend continue coming (and being logged as seen in on-msg I could be going about this in a very wrong way and i am open for suggestions

kingmob09:07:56

@lepistane So, sub-and-serve and handler serve the frontend, and then create-connection and ws-conn talk to the backend, right? (I think it would help to more clearly separate back and front in this code…) How are you disconnecting with Postman? Are you sending a WS Close frame? I’ve never used it for WS. I can easily see it keeping the connection open indefinitely, by responding to Ping frames the middle sends.

lepistane09:07:00

@kingmob that's correct. Noted. There is a button connect/disconnect. I am not sure if it's sending the WS close frame. Pretty sure it's not but i couldn't find confirmation online. I've tested using another websocket client and that's Smart Websocket Client (chrome extension) - behavior is exactly the same as postman.

lepistane13:07:39

Solved

(defn sub-and-serve [request]
  (let [twenty-seconds 20000
        ten-seconds 10000
        id (random-uuid)]
    (d/let-flow [conn (http/websocket-connection request {:heartbeats {:send-after-idle twenty-seconds
                                                                       :timeout ten-seconds}})]
                (stream/on-drained conn (fn []
                                          (timbre/debug "Drained connection: " conn)
                                          (timbre/debug "Client disconnected")
                                          ;;reset
                                          ))
                (stream/put! conn (->json {:message "Events incoming..."}))
                (stream/consume (handler conn id) conn))
    nil))
i needed to put on-drain. That triggers when frontend disconnects.

lepistane13:07:15

friend helped i have no idea how he knew about it

lepistane13:07:22

thanks for your time as well!

lepistane13:07:41

just wanna mention https://clojars.org/aleph doesn't have [aleph "0.5.0"] but that's mentioned on the git [aleph "0.5.0-rc3"] is latest on clojars

kingmob13:07:33

I’m aware, but thx for letting me know. I was in the middle of cutting a 0.5.0 release, but it got delayed by a slow timing bug. The code is pushed to Github (if not, http://cljdoc.org won’t build docs correctly), but no artifact has been pushed to clojars yet.

kingmob13:07:26

@lepistane Glad to hear your problem is gone! But, are you doing something else in the real on-drained callback (maybe in the ;;reset)? Adding a logging callback itself shouldn’t fix anything. More worryingly, I would have thought that https://github.com/clj-commons/aleph/blob/master/src/aleph/http/server.clj#L621-L628 would have taken care of closing it the same way. I might need to look into websocket closing behavior when the underlying streams are closed but websocket-close! isn’t used.

lepistane13:07:15

yes yes, i am just closing the connection from the middleman to backend. This is the final version

(stream/on-drained conn (fn []
                                          (timbre/debug "Drained connection: " conn)
                                          (timbre/debug "Client disconnected: " id)
                                          (swap! client-connections (fn [state]
                                                                      (when-let [gs-conn (get state id)]
                                                                        (ws-close gs-conn))
                                                                      (dissoc state id)))))

kingmob14:07:03

@lepistane Ahh, of course, now it all makes sense. Your issue was the backend ws connections still existed even when the associated frontend ws conns were closed. Is that right? (BTW, I double-checked, and calling manifold.stream/close! on the ws stream works just fine to close it, too.) Anyway, if you want to wire up the frontend ws stream to the backend ws stream in a way that’s 1) much simpler, and 2) will automatically propagate closing, look into manifold.stream’s connect and connect-via fns

lepistane14:07:05

Yes that's right. That's great ! Thanks i will check it out

👍 1