Fork me on GitHub
#code-reviews
<
2022-10-15
>
stopa17:10:46

Use of future + loop as workers Hey team, I am building a system that works over websockets. I expose messages over two queues. I then have workers that take from the queue, like so:

(defn handle-ws-open [{:keys [id
                              send-channel
                              handle-incoming-message
                              send-outgoing-message
                              transform-outgoing-message]}]
  (let [incoming-message-q (LinkedBlockingDeque.)
        outgoing-message-q (LinkedBlockingDeque.)
        session
        {:id id
         :incoming-message-q incoming-message-q
         :outgoing-message-q outgoing-message-q
         :send-channel send-channel
         :workers
         {:ping (future
                  (loop []
                    (when (empty? outgoing-message-q)
                      (.add outgoing-message-q {:op :ping}))
                    (Thread/sleep 5000)
                    (recur)))
          :incoming (future
                      (loop []
                        (let [message (.take incoming-message-q)
                              session (get-session! id)]
                          (handle-incoming-message
                           {:session session
                            :message message})
                          (recur))))
          :outgoing (future
                      (loop []
                        (let [message (.take outgoing-message-q)
                              session (get-session! id)]
                          (send-outgoing-message
                           {:session session
                            :message (transform-outgoing-message
                                      {:session session :message message})})
                          (recur))))}}]
Main question: What are your thoughts on future + loop? This is my solution to abstract workers. I worry about two things though: • Error handling. What if the future fails? It would be nice to notify the parent and drop the socket. I am not sure the best way to do that • Making sure I “cancel” these futures at some point. Right now I rely on the fact that if the socket closes, I cancel the futures. But I worry it’s brittle. How would you approach this? Would you also go with future + loop, or something else?

Rupert (All Street)08:10:56

A few comments: • If you have enough CPU/Memory then JVM/Operating system can handle a few thousand futures relatively well. So this will put an upper bound on the number of users you can have concurrently per server. • 3 futures per socket does feel a bit heavy weight - by may be justified if you are not expecting a high number of users per server. • Each future is a thread and has its own stack (typically around 0.5MB) so 1,000 futures = 500 MB of memory just for stacks. You can adjust the size of the stack with JVM param. • Core async library is another approach (either with the standard bounded thread pool or by using the unbounded thread pool). Just be careful not to do anything slow on the bounded core async threadpool as you can end up blocking/slowing down other core async threads (use the unbounded threadpool for blocking things). • If you used non blocking .take and non blocking send/handle put you could use just a single future/thread for the whole application which would iterate across all sockets and send recieive messages. • Futures return an object that you can call cancel on - this isn't always effective -so you are sometimes better off signalling to a thread by something like an atom to tell it to stop processing. • You can wrap (future (try.. (catch)) to find out it if your operation has failed..

stopa14:10:56

Great feedback, thank you Rupert!

👍 1