This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2020-02-05
Channels
- # announcements (17)
- # architecture (5)
- # babashka (12)
- # beginners (155)
- # calva (18)
- # chlorine-clover (2)
- # cider (57)
- # circleci (2)
- # clojure (151)
- # clojure-europe (4)
- # clojure-gamedev (20)
- # clojure-italy (18)
- # clojure-nl (4)
- # clojure-norway (3)
- # clojure-spec (8)
- # clojure-uk (95)
- # clojurescript (70)
- # core-async (68)
- # css (3)
- # data-science (13)
- # datascript (1)
- # datomic (16)
- # docker (2)
- # figwheel-main (41)
- # fulcro (34)
- # graalvm (6)
- # graphql (7)
- # jobs (14)
- # joker (2)
- # kaocha (1)
- # leiningen (2)
- # malli (3)
- # midje (2)
- # overtone (1)
- # reagent (8)
- # reitit (6)
- # ring-swagger (1)
- # schema (2)
- # shadow-cljs (6)
- # spacemacs (3)
- # specter (5)
- # timbre (3)
- # uncomplicate (1)
it will perform better the larger the buffer because the more of the collection can just be dumped into the channel you don't need an go blocks
Would it performing better with a larger buffer mean it should not bound the buffer size?
What do you mean by this "the more of the collection can just be dumped into the channel you don't need an go blocks"?
unbounded buffers behave very badly (and core.async doesn't let you have them out of the box)
a channel operations (taking and putting) are synchronization points between processes. the timeline of a process putting to a channel has to sync up at some point with the timeline of a process taking from that channel.
but exactly synchronizing can be expensive in terms of processes waiting around for each other
but if you don't force some level of synchronization between takers and putters, you can easily have putters producing stuff faster than takers take which will result in filling up your memory with stuff
so a bounded buffer lets you operate on a channel without having to wait to perfectly synchronize processes, but the bound limits how out of sync they can get
so for a small collection, the process (created by to-chan) that is feeding the collection in to the channel can quickly dump the contents of the collection into the channel, and exit never having to run again, without having to wait to synchronize with some taker from the channel
so it uses bounded-count, for small collections you will get a buffer size that matches the collection count, for larger collections you will get a buffer of 100
Perhaps let me ask a more direct question to my problem... I have a collection of maps which describe HTTP requests. The collection typically varies in size from 10ish to a few thousand maps. I need to issue these requests with a maximum parallelism of ~10 to prevent API throttling as much as possible. I was thinking I'd do something like onto-chan
to collection of requests and then call pipeline-async
with the parallelism set to 10.
Should I use onto-chan
or to-chan
for initializing this pipeline given the expected size of the collection?
Something like this
(defn foo
[{:keys [parallelism]} f coll]
(let [from-ch (async/chan (count coll))
to-ch (async/chan (count coll))]
;; place coll onto the from chan
(async/onto-chan from-ch coll)
(async/pipeline-async
parallelism
to-ch
(fn [coll-item result-ch]
(async/go
(let [result (async/<! (f coll-item))]
(async/>! result-ch result)
(async/close! result-ch))))
from-ch)
(async/reduce
(fn [acc x] (conj acc x))
[] to-ch)))
the buffer size of a channel affects how far ahead a producer can get in front of a consumer
Next question... I'm testing out the above code and it appears 4 calls to af
are made even though I set parallelism to 1. Is there some caveats with how pipeline-async works?
Why? Is there some doc on this? I tried googling and there doesn't appear to be anything on how to choose between the 3 pipeline functions.
(let [start-ms (System/currentTimeMillis)
elapsed-s (fn []
(- (System/currentTimeMillis) start-ms))]
(foo {:parallelism 1}
(fn [n]
(async/go
(print (str (elapsed-s) " - send - " n " \n"))
(flush)
(async/<! (async/timeout (* n 1000)))
n))
(range 10)))
in generally because of the internals of pipeline the parallelism number is off by 2
Hmm, ok. I don't think it really matters for my particular case. I'm more interested in how I'm supposed to choose the pipeline function. I'm issuing HTTP requests. The client I'm using returns a core async channel. The underlying operation is blocking but my interface is not.
In this case I only care about the entire result set -- not streaming results. If I was "streaming" results, is there another pipeline function that does not preserve order?
but you could just farm it out to an executor/thread pool/future in that case
The reason to not farm it out to a executor being to stick within the core.async paradigm?
you could still have your executor tasks throw stuff on a channel
Perhaps I'm struggling to figure out when I should throw these operations onto an actual thread versus simply continuing to do everything within go blocks.
I was also bothered by the pipeline being blocked by the slowest task. Your recommendation is to just use a thread pool? The pipeline abstraction is very comfortable and simplifies architecture. Thread pools / executors less so
If I'm taking using <!!, what thread is that blocking? Perhaps the question is where does xf run?
there are multiple answers
the xf may be run in either the consumer (<!!) or producer
<!! blocks the thread that runs it
The docstrings on the pipeline fns always confuse me. > If you have multiple blocking operations to put in flight, use pipeline-blocking instead, If you have multiple asynchronous operations to put in flight, use pipeline-async instead. Naively it sounds line pipeline-blocking is a better fit when using <!! @hiredman. Why do you suggest pipeline?
pipeline-blocking is maybe more future proof, but pipeline currently runs the exact same code as pipeline-blocking
something like
(defn g [n f requests output]
(let [tokens (async/chan n)
complete (async/chan n)]
(dotimes [i n]
(async/put! tokens true))
(doseq [r requests]
(async/go
(async/<! tokens)
(async/>! output (async/<! (f r)))
(async/>! complete)))
(async/go-loop [x n]
(async/<! complete)
(if (zero? n)
(async/close! output)
(do
(async/>! tokens true)
(recur (dec n)))))))
will limit the number of requests in flight, but run them out of order in case request 0 is really slow for some reason. (there might be an off by one error in there)