Fork me on GitHub
#core-async
<
2018-05-17
>
xiongtx01:05:39

When I run the following:

(let [from-chan (async/to-chan (drop 1 (range)))
      to-chan (async/chan 10)]
  (async/pipeline-async 1 to-chan
                        (fn [v c]
                          (if (> v 10)
                            (do
                              (>!! c :end)
                              (async/close! c))
                            (do
                              (>!! c v)
                              (async/close! c))))
                        from-chan)
  (println (<!! (async/reduce (fn [acc v]
                                (println v)
                                (if (= :end v)
                                  (do (async/close! to-chan)
                                      acc)
                                  (inc acc))) 0 to-chan))))
I get something like:
1
2
3
4
5
6
7
8
9
10
:end
:end
:end
:end
:end
:end
:end
:end
:end
:end
:end
:end
It seems more elements of the input are being consumed than I expect (note the large number of :ends). What's going on here? Chunking? Or something else?

xiongtx01:05:05

Basically, I want to perform (async, though the example isn't) operations w/ a seq of increasing integers, stop processing when some integer returns a certain value, and put results until the stopping integer into another channel.

noisesmith01:05:09

are the number of :end results equal to your channel buffer, even if you change the buffer?

noisesmith01:05:32

I didn't count, but I'm suspicious

xiongtx01:05:43

Hmm, yeah, it does seem to vary w/ the buffer size of to-chan

noisesmith01:05:37

yeah, I think it's an interaction between the buffering and channel-closing behaviors

xiongtx07:05:05

@tbaldridge Got a question about pipeline. The doc says: > Will stop consuming the from channel if the to channel closes. But when I run:

(let [from-chan (async/to-chan (range))
      to-chan (async/chan 1)]
  (async/pipeline 1 to-chan (map #(doto % println)) from-chan)
  (async/close! to-chan)
  (<!! to-chan))
Consumption of from-chan doesn't stop. Am I missing something here?

Alex Miller (Clojure team)12:05:32

You put an infinite range on from-chan ?

tbaldridge13:05:57

@xiongtx there's already one pending item in the pipeline, try taking a few from it and see if it clears out after a couple

xiongtx14:05:57

@alexmiller I see a similar result w/ e.g. (take 100 (range)). Values 0 to 99 are printed out. In my mind, this shouldn't happen, since we only pulled a single value from to-chan (which has a buffer of 1). So where are all the processed values being put?

Alex Miller (Clojure team)14:05:38

pipeline spins up a bunch of machinery behind the scenes. There is a go block that will read from from-chan while values are ready (in this case, you will have as many as 100 values on the channel and ready to go). For each input value, it creates a 1-slot output channel and then enqueues the input value / output channel pair into a jobs channel (sized with n, here 1). There are n (here 1) go blocks that pull from that jobs channel, compute the result and put it in the value-specific output channel. And there is a go block that pulls from a results channel of all the output channels, pushing values into the to-chan as able.

Alex Miller (Clojure team)14:05:30

So because there is work to do, the consumer go block will be pulling those as fast as it can and making jobs

Alex Miller (Clojure team)14:05:57

The (1) worker go block will take jobs off the stack, make the result and enqueue in the results channel

Alex Miller (Clojure team)14:05:35

The output go block will take one result out and feed it to to-chan, then wait for space to become available in the to-chan buffer

Alex Miller (Clojure team)15:05:25

the results channel is also sized to n (here 1) so everything will be backed up for the jobs

Alex Miller (Clojure team)15:05:04

however, transducers may be applied on the calling thread in some cases and I think that’s what’s happening here

Alex Miller (Clojure team)15:05:57

the first go block that is building the transducer chans is effectively doing the work when it creates the job result chan

Alex Miller (Clojure team)15:05:39

that is confusing, since the worker go block (which is controlled by the parallelism) is not actually the one doing the work. it’s been a while since I thought about this and I’m trying to recall what controls the front-end execution and whether this would always occur or under what conditions

Alex Miller (Clojure team)15:05:22

you’re definitely also affected by setting up the scenario to have 100 pre-available items due to using to-chan

Alex Miller (Clojure team)15:05:30

hmm, looking at the code again, I’ll go back on some of that. the first go block is not creating the channel with xf; that happens in the worker go block, so that’s good

hiredman15:05:28

a pipeline ends up acting like a buffer with a size like input-buffer + output-buffer + parallelism = buffer-size

hiredman16:05:33

so with the given code, the pipeline is like a buffer of size 102, I think

hiredman16:05:15

(and pipeline-async has entirely different buffering behavior)

Alex Miller (Clojure team)16:05:08

before the first take on to-chan, you can have values “buffered” in to-chan (1), 1 value closed over in the output go block, one buffered in the result channel (really N), and 1 value closed over in the processor go (really N). so it’s really 1 + 2N + take buffer, something like that (assuming you always have input values available).

Alex Miller (Clojure team)16:05:39

here before you even do the first take, you’ll have 4 values get processed

Alex Miller (Clojure team)16:05:18

however, the close! is the spoiler - closing prevents new values from being put on the to-chan

Alex Miller (Clojure team)16:05:05

but the out go is ignoring that feedback and just continuously spitting things at the to-chan and failing every time

Alex Miller (Clojure team)16:05:29

so that output go block should probably be checking the return value of >!

Alex Miller (Clojure team)16:05:24

I’d say “Will stop consuming the from channel if the to channel closes.” is a lie right now

Alex Miller (Clojure team)16:05:52

it would be more accurate to say “will mercilessly consume the from channel if the to channel closes”

noisesmith16:05:21

haha, wow, good catch

Alex Miller (Clojure team)16:05:41

actually, it does check the return value of >! but there are two (recur) s in that block

Alex Miller (Clojure team)16:05:08

oh, nvm that’s the outer recur

noisesmith16:05:26

so is this a bug or no?

Alex Miller (Clojure team)16:05:23

yeah, that’s what happens, it does notice it can’t write a particular value, but it doesn’t stop trying to read the next value

Alex Miller (Clojure team)16:05:43

the code is definitely not doing what the docstring says, so I’d say it’s a bug

xiongtx17:05:39

@alexmiller You're referring to the close! here? https://github.com/clojure/core.async/blob/core.async-0.4.474/src/main/clojure/clojure/core/async.clj#L527 But we'd escape the go-loop's recur if (nil? p), no?

Alex Miller (Clojure team)17:05:02

no, that’s the check for whether the input is closed

Alex Miller (Clojure team)17:05:23

we’re talking about whether the output (`to`) is closed

Alex Miller (Clojure team)17:05:48

that >! returns false if it’s closed

Alex Miller (Clojure team)17:05:00

which breaks the inner recur, but not the outer recur

Alex Miller (Clojure team)17:05:18

so that go loop just keeps merrily reading values from the pipeline

xiongtx17:05:22

Ah, right. I think we need to check whether to is closed at the top of the go-loop, and break out if so.

Alex Miller (Clojure team)17:05:15

well the first question is, what should the behavior be? that is, is the docstring right.

Alex Miller (Clojure team)17:05:31

there is no public api to check whether a channel is closed

Alex Miller (Clojure team)17:05:07

if we were to stop reading, we should pay attention to the return value of <!

Alex Miller (Clojure team)17:05:30

and not just break the inner recur, but the outer recur, and probably send a signal back through the pipeline channels

Alex Miller (Clojure team)17:05:00

that may involve several changes

xiongtx17:05:42

> there is no public api to check whether a channel is closed Does it need to be public? There's impl/closed? for use inside clojure.core.async.clj.

noisesmith17:05:27

well, it's not public for a reason - there's no way to write non-race-condition code that uses closed?

xiongtx17:05:15

> there's no way to write non-race-condition code that uses closed? B/c another thread could've closed the channel b/t the check and some subsequent action?

Alex Miller (Clojure team)17:05:30

it’s not public because you generally shouldn’t do that

noisesmith17:05:06

well I should qualify that I guess - it can give a true positive, but every negative is ambiguous

xiongtx17:05:15

So I just did a

(go-loop []
       (when-not (impl/closed? to)
         (let [p (<! results)]
           (if (nil? p)
             (when close? (close! to))
             (let [res (<! p)]
               (loop []
                 (let [v (<! res)]
                   (when (and (not (nil? v)) (>! to v))
                     (recur))))
               (recur))))))
And now
(let [from (a/to-chan (range 100))
      to (a/chan 1)]
  (a/pipeline 1 to (map #(doto % println)) from)
  (a/close! to))
;; 1
;; 2

Alex Miller (Clojure team)17:05:00

I think it would be far preferable to respond to >!

👍 4