This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
- # architecture (14)
- # aws (4)
- # aws-lambda (2)
- # beginners (66)
- # cider (63)
- # clara (39)
- # cljsjs (4)
- # cljsrn (3)
- # clojure (111)
- # clojure-berlin (5)
- # clojure-italy (17)
- # clojure-losangeles (1)
- # clojure-nl (4)
- # clojure-uk (93)
- # clojurescript (19)
- # core-async (60)
- # cursive (13)
- # datomic (22)
- # devcards (2)
- # dirac (4)
- # duct (44)
- # emacs (18)
- # fulcro (1)
- # graphql (10)
- # jobs (13)
- # jobs-discuss (27)
- # lumo (1)
- # mount (1)
- # off-topic (22)
- # om-next (1)
- # onyx (16)
- # philosophy (3)
- # planck (4)
- # precept (34)
- # re-frame (66)
- # reagent (6)
- # ring (2)
- # ring-swagger (1)
- # shadow-cljs (333)
- # specter (8)
- # tools-deps (4)
- # vim (15)
- # yada (1)
When I run the following:
I get something like:
(let [from-chan (async/to-chan (drop 1 (range))) to-chan (async/chan 10)] (async/pipeline-async 1 to-chan (fn [v c] (if (> v 10) (do (>!! c :end) (async/close! c)) (do (>!! c v) (async/close! c)))) from-chan) (println (<!! (async/reduce (fn [acc v] (println v) (if (= :end v) (do (async/close! to-chan) acc) (inc acc))) 0 to-chan))))
It seems more elements of the input are being consumed than I expect (note the large number of
1 2 3 4 5 6 7 8 9 10 :end :end :end :end :end :end :end :end :end :end :end :end
:ends). What's going on here? Chunking? Or something else?
Basically, I want to perform (async, though the example isn't) operations w/ a seq of increasing integers, stop processing when some integer returns a certain value, and put results until the stopping integer into another channel.
are the number of :end results equal to your channel buffer, even if you change the buffer?
yeah, I think it's an interaction between the buffering and channel-closing behaviors
@tbaldridge Got a question about
pipeline. The doc says:
> Will stop consuming the
from channel if the
to channel closes.
But when I run:
(let [from-chan (async/to-chan (range)) to-chan (async/chan 1)] (async/pipeline 1 to-chan (map #(doto % println)) from-chan) (async/close! to-chan) (<!! to-chan))
from-chandoesn't stop. Am I missing something here?
@xiongtx there's already one pending item in the pipeline, try taking a few from it and see if it clears out after a couple
@alexmiller I see a similar result w/ e.g.
(take 100 (range)). Values
99 are printed out.
In my mind, this shouldn't happen, since we only pulled a single value from
to-chan (which has a buffer of 1). So where are all the processed values being put?
pipeline spins up a bunch of machinery behind the scenes. There is a go block that will read from from-chan while values are ready (in this case, you will have as many as 100 values on the channel and ready to go). For each input value, it creates a 1-slot output channel and then enqueues the input value / output channel pair into a jobs channel (sized with n, here 1). There are n (here 1) go blocks that pull from that jobs channel, compute the result and put it in the value-specific output channel. And there is a go block that pulls from a results channel of all the output channels, pushing values into the to-chan as able.
So because there is work to do, the consumer go block will be pulling those as fast as it can and making jobs
The (1) worker go block will take jobs off the stack, make the result and enqueue in the results channel
The output go block will take one result out and feed it to to-chan, then wait for space to become available in the to-chan buffer
the results channel is also sized to n (here 1) so everything will be backed up for the jobs
however, transducers may be applied on the calling thread in some cases and I think that’s what’s happening here
the first go block that is building the transducer chans is effectively doing the work when it creates the job result chan
that is confusing, since the worker go block (which is controlled by the parallelism) is not actually the one doing the work. it’s been a while since I thought about this and I’m trying to recall what controls the front-end execution and whether this would always occur or under what conditions
you’re definitely also affected by setting up the scenario to have 100 pre-available items due to using to-chan
hmm, looking at the code again, I’ll go back on some of that. the first go block is not creating the channel with xf; that happens in the worker go block, so that’s good
a pipeline ends up acting like a buffer with a size like input-buffer + output-buffer + parallelism = buffer-size
I looked in to this a bit https://ce2144dc-f7c9-4f54-8fb6-7321a4c318db.s3.amazonaws.com/buffer.html
before the first take on to-chan, you can have values “buffered” in to-chan (1), 1 value closed over in the output go block, one buffered in the result channel (really N), and 1 value closed over in the processor go (really N). so it’s really 1 + 2N + take buffer, something like that (assuming you always have input values available).
however, the close! is the spoiler - closing prevents new values from being put on the to-chan
but the out go is ignoring that feedback and just continuously spitting things at the to-chan and failing every time
I’d say “Will stop consuming the from channel if the to channel closes.” is a lie right now
it would be more accurate to say “will mercilessly consume the from channel if the to channel closes”
actually, it does check the return value of >! but there are two (recur) s in that block
yeah, that’s what happens, it does notice it can’t write a particular value, but it doesn’t stop trying to read the next value
@alexmiller You're referring to the
close! here? https://github.com/clojure/core.async/blob/core.async-0.4.474/src/main/clojure/clojure/core/async.clj#L527
But we'd escape the
go-loop's recur if
(nil? p), no?
I think we need to check whether
to is closed at the top of the
go-loop, and break out if so.
well the first question is, what should the behavior be? that is, is the docstring right.
and not just break the inner recur, but the outer recur, and probably send a signal back through the pipeline channels
> there is no public api to check whether a channel is closed
Does it need to be public? There's
impl/closed? for use inside
well, it's not public for a reason - there's no way to write non-race-condition code that uses closed?
> there's no way to write non-race-condition code that uses
B/c another thread could've closed the channel b/t the check and some subsequent action?
well I should qualify that I guess - it can give a true positive, but every negative is ambiguous
So I just did a
(go-loop  (when-not (impl/closed? to) (let [p (<! results)] (if (nil? p) (when close? (close! to)) (let [res (<! p)] (loop  (let [v (<! res)] (when (and (not (nil? v)) (>! to v)) (recur)))) (recur))))))
(let [from (a/to-chan (range 100)) to (a/chan 1)] (a/pipeline 1 to (map #(doto % println)) from) (a/close! to)) ;; 1 ;; 2