Fork me on GitHub
#core-async
<
2022-05-31
>
maleghast12:05:56

Hi... Does anyone know how to drain a Core Async channel into a list or better yet a lazy-sequence so that the results of a channel can be passed to a function that needs a list or lazy-seq?

ghadi12:05:55

draining into a list is (async/into [] ch)

ghadi12:05:12

note that it returns a channel - not the vector

maleghast12:05:51

@ghadi - I need the vector AND I need to wait until all the calls placed on the channel have completed.

ghadi12:05:10

the vector will not be returned until ch closes

ghadi12:05:30

is that what you mean?

maleghast12:05:55

So how do I wait for the channel to be ready to close?

ghadi12:05:21

I don't understan

maleghast12:05:34

(defn get-conversion-data-backfill
  "Function to organise the backfill and execute it"
  [affiliate-site-id start-date]
  (let [request-chan (async/chan)
        chunks (get-backfill-chunk-start-end-dates (:since @*params))]
    (doseq [chunk chunks]
      (thread
        (async/>!!
         request-chan
         (get-conversion-data
          affiliate-site-id
          (first chunk)
          (validate-end-date (last chunk))))))
    (go
      (while true
        (let [report-map (async/<! request-chan)
              out '()]
          (conj out (:result report-map))
          out)))))

ghadi12:05:35

which channel, the channel returned by async/into, or ch?

ghadi12:05:23

bring back the stuff you just nixed

ghadi12:05:38

whenever you <! take from a channel, it either returns a value, or nil, indicating the channel has closed

ghadi12:05:37

instead of while true, you need (if-some [val (<! ch)] ...)

ghadi12:05:00

but you can just use async/into instead to wait for all the results

ghadi12:05:14

the other thing you need is:

ghadi12:05:17

after the doseq

ghadi12:05:42

actually you only have one channel here, you're putting and taking from the request channel

ghadi12:05:04

are you just trying to fan out some computation?

ghadi12:05:30

if so, make a request & response channels

maleghast12:05:30

I am fanning out HTTP i/o

ghadi12:05:47

bbiab ... look at pipeline-async , it's probably much better for your usecase

otfrom13:05:25

@maleghast claypoole might do what you want as well, tho core.async fits as well depending on the use case

otfrom13:05:59

I'm using claypoole for embarrassingly parallel things, but core.async is probably better if your problem is actually concurrent

maleghast13:05:42

I think that based on the situation I am in I might convert to claypoole as that's the way things are normally done around here...

otfrom13:05:18

I've had a lot of good mileage with upmap in claypoole

maleghast13:05:33

Initially I was pulling things off the channel and writing to files, but to fit in with existing approaches I need the results to be returned as a lazy-seq or a list

otfrom13:05:59

I'd be into understanding better when to choose pipeline-async and when to choose claypoole. In my case knowing that the order didn't matter is what switched me

maleghast13:05:28

I don't care about order in this case

maleghast13:05:05

(which choice is better if order does not matter?)

otfrom13:05:18

for me it was upmap

otfrom13:05:39

pipeline maintains order

ghadi13:05:57

ok, back:

ghadi13:05:07

as written, your fan out has no backpressure

ghadi13:05:24

so you will launch a tooon of threads, possibly overwhelming the system you're hitting

ghadi13:05:40

pipeline-async is a pipeline, so you can set a cap on the max in-flight

maleghast13:05:41

I am launching 17 requests

ghadi13:05:52

oh, ok. cardinalities weren't clear 🙂

maleghast13:05:10

but it would be very useful to know how to do back-pressure for future reference

maleghast13:05:21

So... What I would like to be able to do is launch x HTTP requests concurrently and then return them as a lazy-seq, so that another process can read them

ghadi13:05:22

in general when you have this pattern, you need to know when no more work is to be submitted

ghadi13:05:34

in -> pipeline -> out

ghadi13:05:54

whoever is putting on in needs to async/close! that channel after all work is submitted

ghadi13:05:02

then pipeline will take care of closing out

ghadi13:05:08

in -> pipeline -> out -> async/into

ghadi13:05:30

and if you attach an async/into onto the back, then that will produce a vector when its input closes

maleghast13:05:44

and async-into will give me back the collection

ghadi13:05:09

async/into returns a channel, that returns the collection

ghadi13:05:29

(so you have to take from that channel to get the collection)

ghadi13:05:47

there are 3 flavors of pipeline, you'll want pipeline-async

maleghast13:05:08

I think that I understand, like this: (async/<!! (async/into [] channel))

maleghast13:05:01

OK, thanks - I think that I get it.

ghadi13:05:31

if your use case is limited to http, you may have simpler code by submitting to a thread pool

ghadi13:05:37

like claypoole or whatever

maleghast13:05:57

I was considering doing that instead, yeah.

didibus07:06:41

(defn io
  "Simulating an IO operation by sleeping the calling thread
   for the given amount-of-time. Returns the amount-of-time."
  [amount-of-time]
  (Thread/sleep amount-of-time)
  amount-of-time)

(import 'java.util.concurrent.ExecutorCompletionService)
(import 'java.util.concurrent.Executors)

(defn do-concurrently
  "Executes each task in tasks with concurrency c, assuming side-effects,
   and run handler on their results as they complete. Handler is called
   synchronously from the calling thread."
  [tasks c handler]
  (let [executor (Executors/newFixedThreadPool c)
        cs (ExecutorCompletionService. executor)
        initial (take c tasks)
        remaining (drop c tasks)]
    ;; Submit initial batch of tasks to run concurrently.
    (doseq [task initial]
      (-> cs (.submit task)))
    (doseq [task remaining]
      ;; Block until any task completes.
      (let [result (-> cs .take .get)]
        ;; When there remains tasks, submit another one to
        ;; replace the one that just completed.
        (-> cs (.submit task))
        ;; Handle the result of the task that just completed.
        (handler result)))
    ;; Since we submitted an initial batch, but only handled a remaining
    ;; number of tasks, some tasks are left un-handled, and we need to handle
    ;; them.
    (doseq [_ initial]
      (handler (-> cs .take .get)))
    ;; shutdown executor once all tasks have been processed
    (-> executor .shutdown)))

;;; Run io 10000 times at 10 ms per io call with up to 100 concurrent calls
;;; and sum up all results.
;;; Then print the time it took and the resulting sum.
(let [sum (atom 0)]
  (time
   (do-concurrently (repeat 10000 (partial io 10)) 100 #(swap! sum + %)))
  (println @sum))
The trick is that you first submit c number of tasks to be executed concurrently. In this case, I've chosen to make 100 concurrent calls at a time. The call to submit is non blocking and will return immediately. After you've initiated your first batch, you block on cs, which will wait till any of them complete, and when one does, it will unblock and return the result of the task that just completed. When that happens, we will submit another task, so that we maintain our concurrency level, and we will call our handler with the result. In effect, we're saying, perform n number of calls up to c at a time. We are handling the results on the thread which submits the remaining tasks as they complete. This means that if our handler is very slow, it will delay our re-queuing of remaining tasks, so that's something to keep in mind. Finally, we have to handle the remaining batch of un-handled tasks, and shutdown the executor to release the resources associated with it.

didibus07:06:36

As an example for doing it with executors

maleghast13:06:27

Thanks @U0K064KQV - for now the problem has been solved by swapping core.async out for claypoole, but I am going to make a note of this for the future.

didibus15:06:44

Ya, I'm curious what the difference is with upmap from Claypoole. The code processes things as soon as they are ready and always maxes out the concurrency. But it seems upmap does the same. Looking at the upmap code, I'm not 100% sure, but it seems like they kind of implemented a ExecutorCompletionService on their own.

Eugen12:06:04

this is interesting. If you think Claypoole can be improved please submit an issue and I will take a look. the project has moved to clj-commons btw

maleghast13:05:36

Thanks very much - I think I know what I need to know now...

otfrom13:05:26

I'm using upmp to limit the number of threads I'm working with (using claypoole), but my use case is very cpu intensive data scrubbing and number crunching. My results end up in an (into [] or written to parquet locally.