Fork me on GitHub
#core-async
<
2020-02-05
>
kenny18:02:58

Why does to-chan bound the returned channel's buffer size to 100?

hiredman18:02:38

it will perform better the larger the buffer because the more of the collection can just be dumped into the channel you don't need an go blocks

kenny18:02:36

Would it performing better with a larger buffer mean it should not bound the buffer size?

kenny19:02:41

What do you mean by this "the more of the collection can just be dumped into the channel you don't need an go blocks"?

hiredman19:02:48

unbounded buffers behave very badly (and core.async doesn't let you have them out of the box)

hiredman19:02:06

a channel operations (taking and putting) are synchronization points between processes. the timeline of a process putting to a channel has to sync up at some point with the timeline of a process taking from that channel.

hiredman19:02:08

but exactly synchronizing can be expensive in terms of processes waiting around for each other

hiredman19:02:11

but if you don't force some level of synchronization between takers and putters, you can easily have putters producing stuff faster than takers take which will result in filling up your memory with stuff

hiredman19:02:02

so a bounded buffer lets you operate on a channel without having to wait to perfectly synchronize processes, but the bound limits how out of sync they can get

hiredman19:02:35

so for a small collection, the process (created by to-chan) that is feeding the collection in to the channel can quickly dump the contents of the collection into the channel, and exit never having to run again, without having to wait to synchronize with some taker from the channel

hiredman18:02:47

but you might pass in an infinite seq

hiredman18:02:12

so it uses bounded-count, for small collections you will get a buffer size that matches the collection count, for larger collections you will get a buffer of 100

kenny19:02:05

Perhaps let me ask a more direct question to my problem... I have a collection of maps which describe HTTP requests. The collection typically varies in size from 10ish to a few thousand maps. I need to issue these requests with a maximum parallelism of ~10 to prevent API throttling as much as possible. I was thinking I'd do something like onto-chan to collection of requests and then call pipeline-async with the parallelism set to 10.

kenny19:02:51

Should I use onto-chan or to-chan for initializing this pipeline given the expected size of the collection?

kenny19:02:38

Something like this

(defn foo
  [{:keys [parallelism]} f coll]
  (let [from-ch (async/chan (count coll))
        to-ch (async/chan (count coll))]
    ;; place coll onto the from chan
    (async/onto-chan from-ch coll)

    (async/pipeline-async
      parallelism
      to-ch
      (fn [coll-item result-ch]
        (async/go
          (let [result (async/<! (f coll-item))]
            (async/>! result-ch result)
            (async/close! result-ch))))
      from-ch)

    (async/reduce
      (fn [acc x] (conj acc x))
      [] to-ch)))

ghadi19:02:26

buffer size of the source channel doesn't matter

ghadi19:02:40

you already have all the inputs in hand

kenny19:02:00

Oh, right.

ghadi19:02:08

so use to-chan and pass it as the input to pipeline

ghadi19:02:53

the buffer size of a channel affects how far ahead a producer can get in front of a consumer

ghadi19:02:06

in this case it doesn't matter

4
ghadi19:02:48

btw (async/reduce ... conj) === (async/into)

kenny19:02:08

Oh, nice!

kenny19:02:00

Next question... I'm testing out the above code and it appears 4 calls to af are made even though I set parallelism to 1. Is there some caveats with how pipeline-async works?

hiredman19:02:27

pipeline-async is almost never what you want

kenny19:02:24

Why? Is there some doc on this? I tried googling and there doesn't appear to be anything on how to choose between the 3 pipeline functions.

hiredman19:02:58

pipeline-async's parallelism limit behaves very weirdly

ghadi19:02:28

4 calls to af were made, but how many inputs did you pass in?

kenny19:02:40

(let [start-ms (System/currentTimeMillis)
        elapsed-s (fn []
                    (- (System/currentTimeMillis) start-ms))]
    (foo {:parallelism 1}
          (fn [n]
            (async/go
              (print (str (elapsed-s) " - send - " n " \n"))
              (flush)
              (async/<! (async/timeout (* n 1000)))
              n))
          (range 10)))

ghadi19:02:17

that doesn''t show pipeline

hiredman19:02:36

in generally because of the internals of pipeline the parallelism number is off by 2

hiredman19:02:48

but it is sort of only off by 2 from a certain point of view

kenny19:02:01

Hmm, ok. I don't think it really matters for my particular case. I'm more interested in how I'm supposed to choose the pipeline function. I'm issuing HTTP requests. The client I'm using returns a core async channel. The underlying operation is blocking but my interface is not.

hiredman19:02:20

you may as well just use pipeline and call f and take the result with <!!

hiredman19:02:56

do you actually care about the order your http calls are being made in?

hiredman19:02:40

because the pipelines preserve order

kenny19:02:27

In this case I only care about the entire result set -- not streaming results. If I was "streaming" results, is there another pipeline function that does not preserve order?

Alex Miller (Clojure team)19:02:09

but you could just farm it out to an executor/thread pool/future in that case

kenny19:02:21

The reason to not farm it out to a executor being to stick within the core.async paradigm?

Alex Miller (Clojure team)19:02:01

you could still have your executor tasks throw stuff on a channel

Alex Miller (Clojure team)19:02:25

these are compatible things

4
kenny19:02:48

Perhaps I'm struggling to figure out when I should throw these operations onto an actual thread versus simply continuing to do everything within go blocks.

Ben Sless11:02:29

I was also bothered by the pipeline being blocked by the slowest task. Your recommendation is to just use a thread pool? The pipeline abstraction is very comfortable and simplifies architecture. Thread pools / executors less so

kenny19:02:14

If I'm taking using <!!, what thread is that blocking? Perhaps the question is where does xf run?

Alex Miller (Clojure team)19:02:46

there are multiple answers

Alex Miller (Clojure team)19:02:06

the xf may be run in either the consumer (<!!) or producer

Alex Miller (Clojure team)19:02:16

<!! blocks the thread that runs it

hiredman19:02:11

but pipeline and pipeline-blocking both run the xf on a dedicated thread

kenny19:02:12

How many of these dedicated threads are there?

hiredman19:02:10

the parallelism passed in number

kenny20:02:27

Doesn't this kind of defeat the purpose of using go blocks?

hiredman21:02:45

it is a whole thing

kenny21:02:27

I see. This seems very strange to me.

kenny19:02:56

The docstrings on the pipeline fns always confuse me. > If you have multiple blocking operations to put in flight, use pipeline-blocking instead, If you have multiple asynchronous operations to put in flight, use pipeline-async instead. Naively it sounds line pipeline-blocking is a better fit when using <!! @hiredman. Why do you suggest pipeline?

hiredman19:02:40

pipeline-blocking is maybe more future proof, but pipeline currently runs the exact same code as pipeline-blocking

hiredman19:02:37

something like

(defn g [n f requests output]
  (let [tokens (async/chan n)
        complete (async/chan n)]
    (dotimes [i n]
      (async/put! tokens true))
    (doseq [r requests]
      (async/go
        (async/<! tokens)
        (async/>! output (async/<! (f r)))
        (async/>! complete)))
    (async/go-loop [x n]
      (async/<! complete)
      (if (zero? n)
        (async/close! output)
        (do
          (async/>! tokens true)
          (recur (dec n)))))))
will limit the number of requests in flight, but run them out of order in case request 0 is really slow for some reason. (there might be an off by one error in there)

kenny19:02:54

This was for disregarding order?

hiredman19:02:03

yes, it will run the requests in whatever order and you will get results in whatever order

hiredman19:02:58

oh, even worse

hiredman19:02:08

the references to n in that last loop should be x