This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2021-11-20
Channels
- # announcements (1)
- # babashka (4)
- # beginners (8)
- # cider (4)
- # clj-kondo (10)
- # cljdoc (1)
- # cljsrn (7)
- # clojure (3)
- # clojure-europe (20)
- # clojure-france (3)
- # clojure-sg (2)
- # clojurescript (16)
- # clojureverse-ops (3)
- # community-development (5)
- # core-async (35)
- # cursive (1)
- # datahike (14)
- # datomic (7)
- # events (5)
- # fulcro (59)
- # graphql (11)
- # lsp (33)
- # meander (1)
- # off-topic (33)
- # polylith (23)
- # portal (33)
- # re-frame (1)
- # reagent (10)
- # reclojure (7)
- # reveal (14)
I have a collection and I would like to process them in parallel. The light-f
and heavy-f
return the same result. The heavy-f
has a higher success rate but takes more resources and time to process. I’m having difficulty synchronizing the closing of channels, I’m prematurely closing either the heavy-chan or the results-chan and the puts fail due to this. I’ve looked at the pipeline functions (not really what I want since I don’t care about the order of the items), https://groups.google.com/g/clojure/c/wnn1Ajd14D4, https://ask.clojure.org/index.php/342/unordered-pipeline and https://stuartsierra.com/2013/12/08/parallel-processing-with-core-async but still can’t seem to solve what I’m trying here. I could probably arrive at a solution from one of those links but I was wondering if there was a cleaner way to do it.
(defn results
[coll n-light n-heavy]
(let [light-chan (to-chan! coll)
heavy-chan (chan 20)
results-chan (chan 20)
light-tasks (doall
(repeatedly
n-light
#(go-loop []
(if-let [itm (<! light-chan)]
(let [res (light-f itm)]
(>! heavy-chan res)
(recur))
(close! heavy-chan)))))
heavy-tasks (doall
(repeatedly
n-heavy
#(go-loop []
(if-let [itm (<! heavy-chan)]
(if (light-f-succeeded? itm)
(do
(>! results-chan res)
(recur))
(let [res (heavy-f itm)]
(>! results-chan res)
(recur)))
(close! results-chan)))))]
(<!! (a/into [] results-chan))))
Let's start by thinking about it from the point of view of an individual task - you want to try first function f
, then g
individually:
(fn [x] (let [res (f x)] (if (succeed? res) res (g res))))
Thanks @UK0810AQ2, I did think of this approach but sort of discarded it at an early stage since I wanted to be able to control the parallelism of the heavy task. I wanted the threads working on the first light step to be released and move on to the subsequent items and not wait for the heavy ones to complete. The code is a little simplified, the light and heavy tasks are blocking and will be invoked using async/thread. With this situation do you still think I can use transducers?
Oh you’re saying two separate pipelines, one for heavy and one for light? I’ll try that. Also how much do you think the overhead is for maintaining the order of items when using pipeline
?
Yes, I mean two pipelines. The overhead of maintaining order is a function of the variance in execution time. If you don't want to maintain order just have two thread pools instead of two pipeline stages
The variance is quite a bit since will be http calls to various urls in the internet with a timeout of 60s, the light one with curl and the heavy one with a headless browser.
I would very much like a solution without core.async, but won’t it be hard to go about about queuing, passing items to another queue, waiting for all the tasks to complete, and collecting the results?
I can do a simple cp/upmap and each thread waits on both the light and heavy, but I thought I’ll save time by freeing the threads doing the light task by forwarding the task to a heavy task “pool”.
Every task from the light group can just submit itself to the heavy pool with exactly the same logic
If your processing functions involves IO (you mentioned HTTP calls), pipeline-async
is probably what you’re looking for https://clojuredocs.org/clojure.core.async/pipeline-async
It still maintains order which can backpressure the entire pipeline :man-shrugging:
Well… It’s very much recommended to have a strategy to deal with backpressure at some point of the system. Avoiding order, even if you don’t need it, does not really help with backpressure, as far as I can tell.
Basically by not maintaining order you’re saying: 1. I would like fast requests to be processed immediately. 2. However, there might be an unspecified/unlimited number of requests that are stuck waiting”. I believe those are statements are in conflict with each other, as far as backpressure is concerned. Imagine a situation where you have a very high number of pending requests stuck waiting for close to 60 seconds. You have some more tasks coming in. What does the system do? Regardless of order, it needs to either: 1. Abort some of the pending tasks to schedule new ones, or 2. Potentially run out of memory or other resources.
In so many words what I’m trying to say is that avoiding order does not solve backpressure 🙂
All of this might be a theoretical concern, depending on how mission-critical the system is that you’re designing. A system that generates reports at midnight and fails once in a blue moon might not be the highest priority to fix or get perfectly right in terms of backpressure. If it serves customer-facing requests and/or people might lose money or suffer other consequences, that’s another story 🙂
You can create good flexible systems by combining the various pipeline
options with to/from channels with dropping-buffer
or sliding-buffer
You have to manage backpressure somehow. If you can't afford data loss you can't use sliding or dropping buffers
core.async is not a panacea and because pipelines are ordered you can get undesirable behaviors
You can also end up implementing your own version of an unordered executor with core.async
“core.async is not a panacea and because pipelines are ordered you can get undesirable behaviors” - I completely agree
I managed to implement the thread-pool version and looks good so far (using some helpers from claypoole),
(defn process
[[step & rmn-steps] ^LinkedBlockingQueue result-q input]
(when-let [{:keys [pool f early-return-pred]
:or {early-return-pred (constantly false)}} step]
(.submit ^ExecutorService pool
^Runnable (fn []
(let [res (f input)]
(if (or (early-return-pred res) (empty? rmn-steps))
(.put result-q res)
(process rmn-steps result-q res)))))))
(defn results
[steps buffer-size coll]
(let [result-q (LinkedBlockingQueue. (int buffer-size))
steps (for [{:keys [concurrency] :as step} steps]
(assoc step :pool (Executors/newScheduledThreadPool concurrency)))
process (partial process steps result-q)
shutdown #(doseq [{:keys [pool]} steps]
(.shutdown ^ExecutorService pool))]
(->> coll
(map process)
(#'cpl/forceahead buffer-size)
(map (fn [_] (.take result-q)))
(impl/seq-open shutdown))))
(comment
(def steps
[{:f curl/get
:concurrency 40
:early-return-pred #(<= (:status %) 399)}
{:f #(browser/get (:url %))
:concurrency 10}])
(time (doall (results steps 10 urls))))