This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2022-02-15
Channels
- # announcements (8)
- # architecture (9)
- # autochrome-github (1)
- # babashka (48)
- # beginners (55)
- # calva (36)
- # cider (16)
- # clj-commons (1)
- # clj-kondo (38)
- # cljs-dev (44)
- # cljsrn (1)
- # clojure (164)
- # clojure-europe (35)
- # clojure-nl (2)
- # clojure-norway (10)
- # clojure-uk (23)
- # clojurescript (50)
- # conjure (24)
- # core-async (1)
- # cryogen (2)
- # cursive (38)
- # datalevin (11)
- # datascript (2)
- # datomic (13)
- # duct (1)
- # emacs (16)
- # events (12)
- # exercism (3)
- # figwheel-main (7)
- # fulcro (26)
- # honeysql (5)
- # integrant (1)
- # jobs (3)
- # kaocha (6)
- # lsp (72)
- # malli (22)
- # nextjournal (35)
- # nrepl (1)
- # off-topic (34)
- # pathom (5)
- # polylith (8)
- # portal (40)
- # re-frame (14)
- # reagent (42)
- # reitit (1)
- # releases (1)
- # remote-jobs (1)
- # reveal (9)
- # sci (2)
- # shadow-cljs (13)
- # sql (3)
- # tools-deps (33)
- # vim (25)
Hi everyone again - I don't want to hog the channel, but I have another question if you have the time. I'm still working through Tim Baldridge's 2013 presentation - he has an interesting example where he implements a thread pool. I have added comments and used the log function you helped me with above. Could someone please take a look at the comments and tell me whether I am understanding things correctly?
(def log-ch (chan 25))
(go-loop []
(when-let [v (<! log-ch)]
(println v)
(recur)))
(defn log [msg]
(go
(>! log-ch msg)))
;;--------------------------------------------------------------------------------
(log "Starting ...")
(defn thread-pool-service
" ch - channel supplying values to be transformed by f on some thread in the pool.
f - the function applied to each value taken off ch.
max-threads - the number of threads in the pool.
timeout-ms - the maximum time that a 'value processor' will wait for a value
to arrive on an internal buffering channel before it releases the thread."
[ch f max-threads timeout-ms]
(let [;; Number of threads in use running 'value-processors'.
thread-count (atom 0)
;; Values to be transformed by f on threads in the pool.
;; Each thread runs a function ('value-processor') which repeatedly tries to take a
;; value off this channel for processing by f. If no value is available after
;; timeout-ms milliseconds have elapsed then the thread function exits and releases the thread,
;; decrementing thread-count.
buffer-chan (chan)
;; The function ('value-processor') which runs on a thread processing values taken
;; from buffer-chan.
thread-fn (fn []
(swap! thread-count inc)
(loop []
(when-let [v (first (alts!! [buffer-chan (timeout timeout-ms)]))]
(f v)
(recur)))
(swap! thread-count dec)
(log "Exiting..."))]
(go (loop []
(when-let [v (<! ch)] ;; try to take a value v off the externally supplied channel.
;; Try to put v on buffer-chan without delay. This will fail if a value is already
;; waiting at the rendezvous point of this unbuffered channel. In this case, if
;; the number of threads in use is less than `max-threads` then we create a new thread
;; running a 'value-processor' and queue v on buffer-chan.
;; Otherwise, we wait for a maximum of 1000ms for buffer-chan to accept v. If this
;; attempt fails then the loop recurs.
(if-not (alt! [[buffer-chan v]] true
:default false)
(loop []
(if (< @thread-count max-threads)
(do (put! buffer-chan v)
(thread (thread-fn)))
(when-not (alt! [[buffer-chan v]] true
[(timeout 1000)] ([_] false))
(recur)))))
(recur)))
(close! buffer-chan))))
(def exec-chan (chan)) ;; The channel that supplies values to be processed by threads in the pool
(def thread-pool (thread-pool-service exec-chan ;; values to be processed
(fn [x] ;; the function to apply to each value
(log x)
(Thread/sleep 100))
3 ;; the number of threads in the pool
100)) ;; timeout in ms
(>!! exec-chan "Hello World")
(Thread/sleep 1000)