This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2018-05-17
Channels
- # architecture (14)
- # aws (4)
- # aws-lambda (2)
- # beginners (66)
- # cider (63)
- # clara (39)
- # cljsjs (4)
- # cljsrn (3)
- # clojure (111)
- # clojure-berlin (5)
- # clojure-italy (17)
- # clojure-losangeles (1)
- # clojure-nl (4)
- # clojure-uk (93)
- # clojurescript (19)
- # core-async (60)
- # cursive (13)
- # datomic (22)
- # devcards (2)
- # dirac (4)
- # duct (44)
- # emacs (18)
- # fulcro (1)
- # graphql (10)
- # jobs (13)
- # jobs-discuss (27)
- # lumo (1)
- # mount (1)
- # off-topic (22)
- # om-next (1)
- # onyx (16)
- # philosophy (3)
- # planck (4)
- # precept (34)
- # re-frame (66)
- # reagent (6)
- # ring (2)
- # ring-swagger (1)
- # shadow-cljs (333)
- # specter (8)
- # tools-deps (4)
- # vim (15)
- # yada (1)
When I run the following:
(let [from-chan (async/to-chan (drop 1 (range)))
to-chan (async/chan 10)]
(async/pipeline-async 1 to-chan
(fn [v c]
(if (> v 10)
(do
(>!! c :end)
(async/close! c))
(do
(>!! c v)
(async/close! c))))
from-chan)
(println (<!! (async/reduce (fn [acc v]
(println v)
(if (= :end v)
(do (async/close! to-chan)
acc)
(inc acc))) 0 to-chan))))
I get something like:
1
2
3
4
5
6
7
8
9
10
:end
:end
:end
:end
:end
:end
:end
:end
:end
:end
:end
:end
It seems more elements of the input are being consumed than I expect (note the large number of :end
s). What's going on here? Chunking? Or something else?Basically, I want to perform (async, though the example isn't) operations w/ a seq of increasing integers, stop processing when some integer returns a certain value, and put results until the stopping integer into another channel.
are the number of :end results equal to your channel buffer, even if you change the buffer?
I didn't count, but I'm suspicious
yeah, I think it's an interaction between the buffering and channel-closing behaviors
@tbaldridge Got a question about pipeline
. The doc says:
> Will stop consuming the from
channel if the to
channel closes.
But when I run:
(let [from-chan (async/to-chan (range))
to-chan (async/chan 1)]
(async/pipeline 1 to-chan (map #(doto % println)) from-chan)
(async/close! to-chan)
(<!! to-chan))
Consumption of from-chan
doesn't stop. Am I missing something here?You put an infinite range on from-chan ?
@xiongtx there's already one pending item in the pipeline, try taking a few from it and see if it clears out after a couple
@alexmiller I see a similar result w/ e.g. (take 100 (range))
. Values 0
to 99
are printed out.
In my mind, this shouldn't happen, since we only pulled a single value from to-chan
(which has a buffer of 1). So where are all the processed values being put?
pipeline spins up a bunch of machinery behind the scenes. There is a go block that will read from from-chan while values are ready (in this case, you will have as many as 100 values on the channel and ready to go). For each input value, it creates a 1-slot output channel and then enqueues the input value / output channel pair into a jobs channel (sized with n, here 1). There are n (here 1) go blocks that pull from that jobs channel, compute the result and put it in the value-specific output channel. And there is a go block that pulls from a results channel of all the output channels, pushing values into the to-chan as able.
So because there is work to do, the consumer go block will be pulling those as fast as it can and making jobs
The (1) worker go block will take jobs off the stack, make the result and enqueue in the results channel
The output go block will take one result out and feed it to to-chan, then wait for space to become available in the to-chan buffer
the results channel is also sized to n (here 1) so everything will be backed up for the jobs
however, transducers may be applied on the calling thread in some cases and I think that’s what’s happening here
the first go block that is building the transducer chans is effectively doing the work when it creates the job result chan
that is confusing, since the worker go block (which is controlled by the parallelism) is not actually the one doing the work. it’s been a while since I thought about this and I’m trying to recall what controls the front-end execution and whether this would always occur or under what conditions
you’re definitely also affected by setting up the scenario to have 100 pre-available items due to using to-chan
hmm, looking at the code again, I’ll go back on some of that. the first go block is not creating the channel with xf; that happens in the worker go block, so that’s good
a pipeline ends up acting like a buffer with a size like input-buffer + output-buffer + parallelism = buffer-size
I looked in to this a bit https://ce2144dc-f7c9-4f54-8fb6-7321a4c318db.s3.amazonaws.com/buffer.html
before the first take on to-chan, you can have values “buffered” in to-chan (1), 1 value closed over in the output go block, one buffered in the result channel (really N), and 1 value closed over in the processor go (really N). so it’s really 1 + 2N + take buffer, something like that (assuming you always have input values available).
here before you even do the first take, you’ll have 4 values get processed
1 + 2*1 + 1
however, the close! is the spoiler - closing prevents new values from being put on the to-chan
but the out go is ignoring that feedback and just continuously spitting things at the to-chan and failing every time
so that output go block should probably be checking the return value of >!
I’d say “Will stop consuming the from channel if the to channel closes.” is a lie right now
it would be more accurate to say “will mercilessly consume the from channel if the to channel closes”
haha, wow, good catch
actually, it does check the return value of >! but there are two (recur) s in that block
oh, nvm that’s the outer recur
so is this a bug or no?
yeah, that’s what happens, it does notice it can’t write a particular value, but it doesn’t stop trying to read the next value
the code is definitely not doing what the docstring says, so I’d say it’s a bug
@alexmiller You're referring to the close!
here? https://github.com/clojure/core.async/blob/core.async-0.4.474/src/main/clojure/clojure/core/async.clj#L527
But we'd escape the go-loop
's recur if (nil? p)
, no?
no, that’s the check for whether the input is closed
we’re talking about whether the output (`to`) is closed
that >! returns false if it’s closed
which breaks the inner recur, but not the outer recur
so that go loop just keeps merrily reading values from the pipeline
Ah, right.
I think we need to check whether to
is closed at the top of the go-loop
, and break out if so.
well the first question is, what should the behavior be? that is, is the docstring right.
there is no public api to check whether a channel is closed
if we were to stop reading, we should pay attention to the return value of <!
and not just break the inner recur, but the outer recur, and probably send a signal back through the pipeline channels
that may involve several changes
> there is no public api to check whether a channel is closed
Does it need to be public? There's impl/closed?
for use inside clojure.core.async.clj
.
well, it's not public for a reason - there's no way to write non-race-condition code that uses closed?
> there's no way to write non-race-condition code that uses closed?
B/c another thread could've closed the channel b/t the check and some subsequent action?
exactly
it’s not public because you generally shouldn’t do that
well I should qualify that I guess - it can give a true positive, but every negative is ambiguous
So I just did a
(go-loop []
(when-not (impl/closed? to)
(let [p (<! results)]
(if (nil? p)
(when close? (close! to))
(let [res (<! p)]
(loop []
(let [v (<! res)]
(when (and (not (nil? v)) (>! to v))
(recur))))
(recur))))))
And now
(let [from (a/to-chan (range 100))
to (a/chan 1)]
(a/pipeline 1 to (map #(doto % println)) from)
(a/close! to))
;; 1
;; 2