Fork me on GitHub
#core-async
<
2021-11-20
>
indy13:11:59

I have a collection and I would like to process them in parallel. The light-f and heavy-f return the same result. The heavy-f has a higher success rate but takes more resources and time to process. I’m having difficulty synchronizing the closing of channels, I’m prematurely closing either the heavy-chan or the results-chan and the puts fail due to this. I’ve looked at the pipeline functions (not really what I want since I don’t care about the order of the items), https://groups.google.com/g/clojure/c/wnn1Ajd14D4, https://ask.clojure.org/index.php/342/unordered-pipeline and https://stuartsierra.com/2013/12/08/parallel-processing-with-core-async but still can’t seem to solve what I’m trying here. I could probably arrive at a solution from one of those links but I was wondering if there was a cleaner way to do it.

(defn results
  [coll n-light n-heavy]
  (let [light-chan   (to-chan! coll)
        heavy-chan   (chan 20)
        results-chan (chan 20)
        light-tasks  (doall
                      (repeatedly
                       n-light
                       #(go-loop []
                          (if-let [itm (<! light-chan)]
                            (let [res (light-f itm)]
                              (>! heavy-chan res)
                              (recur))
                            (close! heavy-chan)))))

        heavy-tasks  (doall
                      (repeatedly
                       n-heavy
                       #(go-loop []
                          (if-let [itm (<! heavy-chan)]
                            (if (light-f-succeeded? itm)
                              (do
                                (>! results-chan res)
                                (recur))
                              (let [res (heavy-f itm)]
                                (>! results-chan res)
                                (recur)))
                            (close! results-chan)))))]
    (<!! (a/into [] results-chan))))

Ben Sless14:11:53

Let's start by thinking about it from the point of view of an individual task - you want to try first function f, then g individually:

(fn [x] (let [res (f x)] (if (succeed? res) res (g res))))

Ben Sless14:11:12

Then you can break it in two steps

Ben Sless14:11:22

(def step1 (map f))

Ben Sless14:11:50

(def step2 (map #(if (succeed? %) % (g %))

Ben Sless14:11:16

Then you can stick these transducers in pipelines for parallel execution

Ben Sless14:11:25

to start/stop

Ben Sless14:11:41

you need to first create a channel of all your items with onto-chan

Ben Sless14:11:56

you can collect the final output channel using reduce

indy14:11:24

Thanks @UK0810AQ2, I did think of this approach but sort of discarded it at an early stage since I wanted to be able to control the parallelism of the heavy task. I wanted the threads working on the first light step to be released and move on to the subsequent items and not wait for the heavy ones to complete. The code is a little simplified, the light and heavy tasks are blocking and will be invoked using async/thread. With this situation do you still think I can use transducers?

indy14:11:23

Oh you’re saying two separate pipelines, one for heavy and one for light? I’ll try that. Also how much do you think the overhead is for maintaining the order of items when using pipeline?

Ben Sless14:11:58

Yes, I mean two pipelines. The overhead of maintaining order is a function of the variance in execution time. If you don't want to maintain order just have two thread pools instead of two pipeline stages

Ben Sless14:11:04

Then you don't even need core async

indy14:11:17

The variance is quite a bit since will be http calls to various urls in the internet with a timeout of 60s, the light one with curl and the heavy one with a headless browser.

indy14:11:52

I would very much like a solution without core.async, but won’t it be hard to go about about queuing, passing items to another queue, waiting for all the tasks to complete, and collecting the results?

indy14:11:52

I can do a simple cp/upmap and each thread waits on both the light and heavy, but I thought I’ll save time by freeing the threads doing the light task by forwarding the task to a heavy task “pool”.

Ben Sless15:11:35

Every task from the light group can just submit itself to the heavy pool with exactly the same logic

indy16:11:45

I’ll explore this track, thanks :)

indy16:11:35

I think the simple threadpool approach makes the most sense, thanks again for the nudge

Ben Sless16:11:00

Welcome. Sometimes the simple answer is enough 🙂

raspasov08:11:13

If your processing functions involves IO (you mentioned HTTP calls), pipeline-async is probably what you’re looking for https://clojuredocs.org/clojure.core.async/pipeline-async

Ben Sless08:11:29

It still maintains order which can backpressure the entire pipeline :man-shrugging:

raspasov09:11:11

Well… It’s very much recommended to have a strategy to deal with backpressure at some point of the system. Avoiding order, even if you don’t need it, does not really help with backpressure, as far as I can tell.

raspasov09:11:50

Basically by not maintaining order you’re saying: 1. I would like fast requests to be processed immediately. 2. However, there might be an unspecified/unlimited number of requests that are stuck waiting”. I believe those are statements are in conflict with each other, as far as backpressure is concerned. Imagine a situation where you have a very high number of pending requests stuck waiting for close to 60 seconds. You have some more tasks coming in. What does the system do? Regardless of order, it needs to either: 1. Abort some of the pending tasks to schedule new ones, or 2. Potentially run out of memory or other resources.

raspasov09:11:06

In so many words what I’m trying to say is that avoiding order does not solve backpressure 🙂

raspasov09:11:59

All of this might be a theoretical concern, depending on how mission-critical the system is that you’re designing. A system that generates reports at midnight and fails once in a blue moon might not be the highest priority to fix or get perfectly right in terms of backpressure. If it serves customer-facing requests and/or people might lose money or suffer other consequences, that’s another story 🙂

raspasov09:11:25

You can create good flexible systems by combining the various pipeline options with to/from channels with dropping-buffer or sliding-buffer

Ben Sless09:11:14

You have to manage backpressure somehow. If you can't afford data loss you can't use sliding or dropping buffers

Ben Sless09:11:27

a semaphor or a blocking queue and passing things between executors is enough

Ben Sless09:11:04

core.async is not a panacea and because pipelines are ordered you can get undesirable behaviors

💯 1
Ben Sless09:11:47

You can also end up implementing your own version of an unordered executor with core.async

raspasov09:11:52

“core.async is not a panacea and because pipelines are ordered you can get undesirable behaviors” - I completely agree

indy10:11:30

I managed to implement the thread-pool version and looks good so far (using some helpers from claypoole),

(defn process
  [[step & rmn-steps] ^LinkedBlockingQueue result-q input]
  (when-let [{:keys [pool f early-return-pred]
              :or   {early-return-pred (constantly false)}} step]
    (.submit ^ExecutorService pool
             ^Runnable (fn []
                         (let [res (f input)]
                           (if (or (early-return-pred res) (empty? rmn-steps))
                             (.put result-q res)
                             (process rmn-steps result-q res)))))))

(defn results
  [steps buffer-size coll]
  (let [result-q (LinkedBlockingQueue. (int buffer-size))
        steps    (for [{:keys [concurrency] :as step} steps]
                   (assoc step :pool (Executors/newScheduledThreadPool concurrency)))
        process  (partial process steps result-q)
        shutdown #(doseq [{:keys [pool]} steps]
                    (.shutdown ^ExecutorService pool))]
    (->> coll
         (map process)
         (#'cpl/forceahead buffer-size)
         (map (fn [_] (.take result-q)))
         (impl/seq-open shutdown))))

(comment
 (def steps
   [{:f                 curl/get
     :concurrency       40
     :early-return-pred #(<= (:status %) 399)}
    {:f           #(browser/get (:url %))
     :concurrency 10}])

 (time (doall (results steps 10 urls))))

👌 1
indy10:11:00

I guess I could use seque instead of cpl/forceahead

indy13:11:53

The problem basically boils down to, how do I know if all the tasks are finished and how do I then close the corresponding channels.