This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2022-05-31
Channels
- # announcements (8)
- # babashka (8)
- # beginners (13)
- # biff (1)
- # calva (1)
- # cider (12)
- # clj-kondo (16)
- # cljs-dev (3)
- # cljsrn (14)
- # clojure (18)
- # clojure-austin (2)
- # clojure-czech (3)
- # clojure-europe (54)
- # clojure-germany (5)
- # clojure-nl (1)
- # clojure-norway (2)
- # clojure-spec (4)
- # clojure-survey (2)
- # clojure-uk (1)
- # clojured (15)
- # clojurescript (5)
- # conjure (6)
- # core-async (65)
- # cursive (24)
- # data-science (1)
- # emacs (9)
- # events (3)
- # graphql (5)
- # integrant (6)
- # jobs (2)
- # joyride (62)
- # lsp (5)
- # malli (10)
- # off-topic (20)
- # pathom (57)
- # polylith (18)
- # re-frame (12)
- # remote-jobs (3)
- # rewrite-clj (14)
- # sci (2)
- # shadow-cljs (41)
- # sql (9)
- # tools-deps (68)
Hi... Does anyone know how to drain a Core Async channel into a list or better yet a lazy-sequence so that the results of a channel can be passed to a function that needs a list or lazy-seq?
@ghadi - I need the vector AND I need to wait until all the calls placed on the channel have completed.
(defn get-conversion-data-backfill
"Function to organise the backfill and execute it"
[affiliate-site-id start-date]
(let [request-chan (async/chan)
chunks (get-backfill-chunk-start-end-dates (:since @*params))]
(doseq [chunk chunks]
(thread
(async/>!!
request-chan
(get-conversion-data
affiliate-site-id
(first chunk)
(validate-end-date (last chunk))))))
(go
(while true
(let [report-map (async/<! request-chan)
out '()]
(conj out (:result report-map))
out)))))
whenever you <! take from a channel, it either returns a value, or nil
, indicating the channel has closed
actually you only have one channel here, you're putting and taking from the request channel
@maleghast claypoole might do what you want as well, tho core.async fits as well depending on the use case
I'm using claypoole for embarrassingly parallel things, but core.async is probably better if your problem is actually concurrent
I think that based on the situation I am in I might convert to claypoole as that's the way things are normally done around here...
Initially I was pulling things off the channel and writing to files, but to fit in with existing approaches I need the results to be returned as a lazy-seq or a list
I'd be into understanding better when to choose pipeline-async and when to choose claypoole. In my case knowing that the order didn't matter is what switched me
So... What I would like to be able to do is launch x HTTP requests concurrently and then return them as a lazy-seq, so that another process can read them
in general when you have this pattern, you need to know when no more work is to be submitted
whoever is putting on in
needs to async/close!
that channel after all work is submitted
and if you attach an async/into onto the back, then that will produce a vector when its input closes
if your use case is limited to http, you may have simpler code by submitting to a thread pool
(defn io
"Simulating an IO operation by sleeping the calling thread
for the given amount-of-time. Returns the amount-of-time."
[amount-of-time]
(Thread/sleep amount-of-time)
amount-of-time)
(import 'java.util.concurrent.ExecutorCompletionService)
(import 'java.util.concurrent.Executors)
(defn do-concurrently
"Executes each task in tasks with concurrency c, assuming side-effects,
and run handler on their results as they complete. Handler is called
synchronously from the calling thread."
[tasks c handler]
(let [executor (Executors/newFixedThreadPool c)
cs (ExecutorCompletionService. executor)
initial (take c tasks)
remaining (drop c tasks)]
;; Submit initial batch of tasks to run concurrently.
(doseq [task initial]
(-> cs (.submit task)))
(doseq [task remaining]
;; Block until any task completes.
(let [result (-> cs .take .get)]
;; When there remains tasks, submit another one to
;; replace the one that just completed.
(-> cs (.submit task))
;; Handle the result of the task that just completed.
(handler result)))
;; Since we submitted an initial batch, but only handled a remaining
;; number of tasks, some tasks are left un-handled, and we need to handle
;; them.
(doseq [_ initial]
(handler (-> cs .take .get)))
;; shutdown executor once all tasks have been processed
(-> executor .shutdown)))
;;; Run io 10000 times at 10 ms per io call with up to 100 concurrent calls
;;; and sum up all results.
;;; Then print the time it took and the resulting sum.
(let [sum (atom 0)]
(time
(do-concurrently (repeat 10000 (partial io 10)) 100 #(swap! sum + %)))
(println @sum))
The trick is that you first submit c
number of tasks to be executed concurrently. In this case, I've chosen to make 100 concurrent calls at a time. The call to submit
is non blocking and will return immediately. After you've initiated your first batch, you block on cs
, which will wait till any of them complete, and when one does, it will unblock and return the result of the task that just completed. When that happens, we will submit another task, so that we maintain our concurrency level, and we will call our handler with the result. In effect, we're saying, perform n number of calls up to c at a time. We are handling the results on the thread which submits the remaining tasks as they complete. This means that if our handler is very slow, it will delay our re-queuing of remaining tasks, so that's something to keep in mind. Finally, we have to handle the remaining batch of un-handled tasks, and shutdown the executor
to release the resources associated with it.Thanks @U0K064KQV - for now the problem has been solved by swapping core.async out for claypoole, but I am going to make a note of this for the future.
Ya, I'm curious what the difference is with upmap
from Claypoole. The code processes things as soon as they are ready and always maxes out the concurrency. But it seems upmap
does the same. Looking at the upmap code, I'm not 100% sure, but it seems like they kind of implemented a ExecutorCompletionService on their own.