This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2023-12-28
Channels
- # beginners (2)
- # calva (8)
- # capetown (1)
- # clojure (28)
- # clojure-europe (6)
- # clojure-norway (82)
- # clojure-sweden (1)
- # clojuredesign-podcast (5)
- # clojurescript (26)
- # core-async (3)
- # cryogen (7)
- # datahike (30)
- # datomic (10)
- # figwheel-main (8)
- # honeysql (8)
- # hyperfiddle (15)
- # jobs-discuss (6)
- # lsp (6)
- # matrix (6)
- # off-topic (12)
- # overtone (1)
- # polylith (6)
- # portal (6)
- # releases (1)
- # shadow-cljs (9)
- # sql (1)
- # xtdb (5)
Hello I'm using pipeline to handled a job queue (jobs are pulled from db, and handled by multiple workers), and here is what I would want to do:
• Handling no more than 3 messages in parallel ✅
• My program should not hold more than 3 messages ❌
In other words, what i'm looking to achieve in this snippet is; if 3 message are already being handled, it would not be possible to push message to in-chan
.
Am I using the right lib, maybe core.async
is not the way to go?
(def in-chan (a/chan))
(def out-chan (a/chan (a/dropping-buffer 0)))
(def ct (atom 0))
(a/pipeline
3
out-chan
(map (fn [e]
(println "handling el" e) (Thread/sleep 20000) (println "Finish handling el" e) (assoc e :handled true)))
in-chan)
(comment
;; As pipeline dequeue message before handling them, this command is queuing messages for future handling (that I don't want)
(a/offer! in-chan {:id (swap! ct inc)}))
Thanks by advance for you help 🙂personally I would look at https://github.com/KingMob/TrueGrit#about
first for this, specifically bulkhead
Ok, due to this "bizarre" behaviour (`in-chan` accepts messages but transducer is not applied), I looked at https://github.com/clojure/core.async/blob/322adc72852bb79c143ebe5d203d100534fda8f2/src/main/clojure/cljs/core/async.cljs#L264C1-L264C25. I did not understand everything, but it seems that it declares intermediate channels, which is a start of an explanation. I ended up doing my own parallel handler which seem to do what I want:
(def in-chan (a/chan))
(def ct (atom 0))
(dotimes [_ 3]
(a/go-loop []
(when-let [e (a/