Fork me on GitHub
#core-async
<
2022-02-15
>
Neil Barrett22:02:17

Hi everyone again - I don't want to hog the channel, but I have another question if you have the time. I'm still working through Tim Baldridge's 2013 presentation - he has an interesting example where he implements a thread pool. I have added comments and used the log function you helped me with above. Could someone please take a look at the comments and tell me whether I am understanding things correctly?

(def log-ch (chan 25))

(go-loop []
    (when-let [v (<! log-ch)]
      (println v)
      (recur)))
    
(defn log [msg]
  (go
    (>! log-ch msg)))

;;--------------------------------------------------------------------------------

(log "Starting ...")

(defn thread-pool-service 
  " ch            - channel supplying values to be transformed by f on some thread in the pool.
   f             - the function applied to each value taken off ch. 
   max-threads   - the number of threads in the pool.
   timeout-ms    - the maximum time that a 'value processor' will wait for a value
                   to arrive on an internal buffering channel before it releases the thread."
  [ch f max-threads timeout-ms]
  
  (let [;; Number of threads in use running 'value-processors'.
        thread-count (atom 0)  
        ;; Values to be transformed by f on threads in the pool.
        ;; Each thread runs a function ('value-processor') which repeatedly tries to take a 
        ;; value off this channel for processing by f. If no value is available after 
        ;; timeout-ms milliseconds have elapsed then the thread function exits and releases the thread,
        ;; decrementing thread-count.
        buffer-chan (chan) 
        ;; The function ('value-processor') which runs on a thread processing values taken 
        ;; from buffer-chan.
        thread-fn (fn []       
                    (swap! thread-count inc)
                    (loop []
                      (when-let [v (first (alts!! [buffer-chan (timeout timeout-ms)]))]
                        (f v)
                        (recur)))
                    (swap! thread-count dec)
                    (log "Exiting..."))]
    (go (loop []
          (when-let [v (<! ch)] ;; try to take a value v off the externally supplied channel.
            ;; Try to put v on buffer-chan without delay. This will fail if a value is already 
            ;; waiting at the rendezvous point of this unbuffered channel. In this case, if 
            ;; the number of threads in use is less than `max-threads` then we create a new thread
            ;; running a 'value-processor' and queue v on buffer-chan.
            ;; Otherwise, we wait for a maximum of 1000ms for buffer-chan to accept v. If this
            ;; attempt fails then the loop recurs. 
            (if-not (alt! [[buffer-chan v]] true
                          :default false)
              (loop []
                (if (< @thread-count max-threads)
                  (do (put! buffer-chan v)
                      (thread (thread-fn)))
                  (when-not (alt! [[buffer-chan v]] true
                                  [(timeout 1000)] ([_] false))
                    (recur)))))
            (recur)))
        (close! buffer-chan))))

(def exec-chan (chan)) ;; The channel that supplies values to be processed by threads in the pool

(def thread-pool (thread-pool-service exec-chan ;; values to be processed
                                      (fn [x]   ;; the function to apply to each value
                                        (log x)
                                        (Thread/sleep 100)) 
                                      3         ;; the number of threads in the pool
                                      100))     ;; timeout in ms

(>!! exec-chan "Hello World")

(Thread/sleep 1000)