Fork me on GitHub
#core-async
<
2023-12-28
>
babardo10:12:12

Hello I'm using pipeline to handled a job queue (jobs are pulled from db, and handled by multiple workers), and here is what I would want to do: • Handling no more than 3 messages in parallel • My program should not hold more than 3 messages In other words, what i'm looking to achieve in this snippet is; if 3 message are already being handled, it would not be possible to push message to in-chan . Am I using the right lib, maybe core.async is not the way to go?

(def in-chan (a/chan))
(def out-chan (a/chan (a/dropping-buffer 0)))
(def ct (atom 0))

(a/pipeline
 3
 out-chan
 (map (fn [e]
        (println "handling el" e) (Thread/sleep 20000) (println "Finish handling el" e) (assoc e :handled true)))
 in-chan)

(comment
  ;; As pipeline dequeue message before handling them, this command is queuing messages for future handling (that I don't want)
  (a/offer! in-chan {:id (swap! ct inc)}))
Thanks by advance for you help 🙂

skynet14:12:49

personally I would look at https://github.com/KingMob/TrueGrit#about first for this, specifically bulkhead

🙏 1
babardo15:12:05

Ok, due to this "bizarre" behaviour (`in-chan` accepts messages but transducer is not applied), I looked at https://github.com/clojure/core.async/blob/322adc72852bb79c143ebe5d203d100534fda8f2/src/main/clojure/cljs/core/async.cljs#L264C1-L264C25. I did not understand everything, but it seems that it declares intermediate channels, which is a start of an explanation. I ended up doing my own parallel handler which seem to do what I want:

(def in-chan (a/chan))
(def ct (atom 0))

(dotimes [_ 3]
  (a/go-loop []
    (when-let [e (a/