Fork me on GitHub
#pedestal
<
2018-06-06
>
souenzzo13:06:48

about jetty / ws example https://github.com/pedestal/pedestal/blob/master/samples/jetty-web-sockets/src/jetty_web_sockets/service.clj#L61 When the server receives a on-close, how it know the "origin" of this message (to remove it from ws-clients atom)????

dadair16:06:14

@souenzzo You basically need to close over your own machinery, @hlship has a good example in his lacinia-pedestal project that I was able to emulate for my own project, see here https://github.com/walmartlabs/lacinia-pedestal/blob/master/src/com/walmartlabs/lacinia/pedestal/subscriptions.clj#L414

dadair16:06:42

Here’s an example from my own project:

(defn socket-listener
  "Core listening function for websocket connections/requests."
  [logger idle-timeout auth raw-input-queue sockets {:keys [pub!]}]
  (fn [req resp _]
    (let [[claims jwt-token] (user/authenticated? req auth)
          socket-id (transport/new-socket-id)
          socket-in-ch (async/chan)
          socket-out-ch (async/chan 10)
          socket-kill-ch (async/chan)
          skt (transport/make-socket
               socket-id
               (user/parse-claims claims)
               socket-out-ch
               jwt-token)
          on-connect (fn [session send-ch]
                       (let [remote-addr (.getRemoteAddress ^WebSocketSession session)
                             extra-time 60
                             timeout-ms (utils/s->ms (+ idle-timeout extra-time))]
                         (logger/log logger :report ::connect {:remote remote-addr})
                         (.setIdleTimeout ^WebSocketSession session timeout-ms))
                       (sockets/socket-sender socket-out-ch send-ch socket-kill-ch)
                       (sockets/socket-receiver socket-in-ch
                                                socket-out-ch
                                                raw-input-queue
                                                socket-kill-ch
                                                send-ch
                                                {:idle-timeout idle-timeout
                                                 :socket-id socket-id})
                       (if claims
                         (let [system-topic (transport/make-topic "bypass/system" {})]
                           (swap! sockets sockets/register-socket skt [system-topic]))
                         (do
                           (async/put! socket-out-ch (transport/envelope-unauthorized))
                           (async/<!! (async/timeout 1000))
                           (async/close! send-ch))))]
      (ws/make-ws-listener
       {:on-connect (ws/start-ws-connection on-connect)
        :on-text (fn [raw]
                   (logger/log logger :info ::received {:message raw})
                   (async/put! socket-in-ch [raw socket-id]))
        :on-error (fn [cause]
                    (logger/log logger :warn ::error {:cause cause}))
        :on-close (fn [code reason]
                    (logger/log logger :report ::closed {:code code :reason reason})
                    (let [topics (get-in @sockets [:socket-id->topics (:id skt)])]
                      (swap! sockets sockets/unregister-socket skt)
                      (async/put! socket-kill-ch :kill)
                      (run! async/close! [socket-kill-ch socket-in-ch socket-out-ch])
                      (pub! (socket-closed-evt skt topics))))}))))

👍 4