Fork me on GitHub
Ben Grabow22:05:45

I've spent the last day and a half wondering at some of my core.async code and ultimately found an annoying gotcha in core.async/pipe. Pipe says on the tin that it takes elements from the from channel and puts them onto the to channel, and stops consuming from when to closes. What it fails to mention is that when to closes, the pipe worker will consume ONE MORE ELEMENT from from that then gets dropped on the floor when the pipe worker notices that to is closed. If you have any expectation to continue consuming elements from from in another manner, you will never see that dropped element.

Ben Grabow22:05:49

Is this the expected behavior? Am I using pipe in an unusual way where the from channel continues to be read from in a second context after the first to consumer closes? Should this behavior be called out in the docs?


From reading the code i don’t see how that is possible... do you have a repro case?

Ben Grabow22:05:34

Just a moment, I'll put one together.


Clj or cljs

Ben Grabow22:05:33

For now, imagine the pipe worker is parked at (let [v (<! from)] then the to channel closes. What happens to v? Does it get delivered to anything?

Ben Grabow22:05:46

CLJ for my use case


heh I see what you are saying


There’s no transactional conveyance from from to to

Ben Grabow22:05:57

that's a good way to put it


The in flight v is dropped on the floor


In general you might always produce n+1 things in response to n demand

Ben Grabow22:05:00

(let [from (async/chan)
      to-a (async/chan)
      to-b (async/chan)]
  (async/pipe from to-a)
  (async/close! to-a)
  (async/pipe from to-b)
  (async/go (async/>! from :1)
            (async/>! from :2))
  (async/<!! to-b))

Ben Grabow22:05:14

Interestingly the first time I ran this I got :1 but got :2 on every subsequent run

Ben Grabow22:05:33

Is close! asynchronous? I can't think of why I would get :1 otherwise.

Ben Grabow22:05:59

Oh, I suppose the second pipe might make progress before the first pipe does.

Ben Grabow22:05:25

Then :1 would be earmarked for delivery to to-b instead of to-a

Ben Grabow22:05:03

This should be pretty reliable then:

(let [from (async/chan)
      to-a (async/chan)
      to-b (async/chan)]
  (async/pipe from to-a)
  (Thread/sleep 10)
  (async/close! to-a)
  (async/pipe from to-b)
  (async/go (async/>! from :1)
            (async/>! from :2))
  (async/<!! to-b))


it is complicated


in general it is better to use >! and <! ops to communicate to impose ordering over trying to do it via close!

Ben Grabow22:05:19

I'm afraid I don't see the relevance of that advice.

Ben Grabow22:05:56

If I use pipe to hook up a producer and consumer, and the consumer expires but I want to make sure the producer gets rerouted to a new consumer, I don't see how I can make the changeover reliably without leaking an element on the floor.

Ben Grabow22:05:29

Maybe pipe is just not intended to be used in this way, which is fine, just not what I expected when I read the docstring.


you are trying to reason about how the time lines of the go blocks spun up by the pipes, and the go block you spin up, and the main thread interact

Ben Grabow22:05:13

Oh, sure in my repro example it's just a kludge to get it working "most of the time"

Ben Grabow22:05:16

I'm not sure how I can make it a watertight example though since it's always nondeterministic how much progress the first pipe worker will make before I hook up the second pipe worker.


in general, in core.async anything that copies from one channel and adds to another effectively acts as an extra buffer of size at least 1 in the whole system


and there isn't really a way(in core.async) to make it transactional so you don't need that extra buffer of at least 1

Ben Grabow22:05:39

I think my surprise is mostly from the docstring that says Will stop consuming the from channel if the to channel closes". Now that I've noticed the nuance here it makes sense why it's not transactional and I'm fine with that behavior now that I'm aware of it, but I wonder if the docstring could be improved to call out this edge case.

hiredman22:05:25 is a doctoral thesis (beautifully typeset) aimed at making it possible to do the transactional thing, it is neat

Ben Grabow22:05:57

pipeline has the same verbiage in the docstring and the same behavior. I haven't had time to look through all the other cases that might be similar. I feel like I'm a relatively experienced user of core.async and this mystified me and my coworkers.


pipeline is even more of a buffer then pipe


pipe introduces a buffer of 1, pipeline introduces a buffer of at least n (the parallelism number you pass in)

Ben Grabow22:05:07

Yeah, the fact that there's a buffer doesn't surprise me at all. The fact that the contents of the buffer could be dropped when the destination changes is what suprprised me.


yeah, it is important to keep in mind you don't have a network of channels, you have channels connecting a network of things that do computation, and once data is out of a channel in in the computation, it is gone from the channel, so even if pipe wanted to, it can't put it back at the front of the channel

Ben Grabow22:05:38

Yep that makes sense.

Ben Grabow22:05:05

I don't have a question as much as a request: Can the docstring of pipe (and probably pipeline too?) be improved to call attention to this behavior?

Ben Grabow22:05:44

I want to save other people the trouble of figuring this out for themselves.


the best way to do that may be to post something on

Ben Grabow23:05:16

I'll do that, thanks.

Ben Grabow22:05:10

Well I suppose I do have a question after all. My use case is basically this:

(let [producer (async/chan)
      consumer-a (async/chan)
      consumer-b (async/chan)]
  (connect-with-magic producer consumer-a)
  (disconnect-with-magic consumer-a)
  (connect-with-magic producer consumer-b)
  (async/go (async/>! producer :val))
  (async/<! consumer-b)) ;=> :val
Is there something I can put in connect-with-magic and disconnect-with-magic such that :val is guaranteed to be delivered to consumer-b? I'm perfectly okay with produced vals being ignored if they are produced before consumer-b is connected, but once the consumer is connected I need all the produced values to be delivered.


you can write your own copying construct that looks at the return value of the put to determine what to do


(let [producer (async/chan)
      consumer-a (async/chan)
      consumer-b (async/chan)]
  (async/go-loop []
    (let [msg (async/<! producer)]
      (when-some msg
        (when-not (async/>! consumer-a msg)
          (async/>! consumer-b msg)))))
  (async/close! consumer-a)
  (async/go-loop []
    (let [msg (async/<! producer)]
      (when-some msg
        (when-not (async/>! consumer-b msg)
          (async/>! consumer-a msg)))))
  (async/go (async/>! producer :val))
  (async/<! consumer-b))


oh, and I guess those loops need recurs in there somewhere


basically write to one consumer, and if that fails write to the other


but you have no ordering guarantees there

Ben Grabow23:05:08

I have essentially a seq of consumers. I want to produce to the first consumer until that consumer is closed, then move to the next consumer. If no consumers are connected I'm fine dropping values. So my problem is a little more general than these two consumers.

Ben Grabow23:05:50

I'm trying out mult and tap right now and it looks like it might work for me?


you will race on tapping with mult

Ben Grabow23:05:27

I'm fine with the race on tap as long as the old tap doesn't cause an element to disappear

Ben Grabow23:05:00

The problem with the pipe solution is that once the first pipe worker parks on take, it's impossible for me to recover the next element to be delivered. With mult/tap I at least have a chance of reading the next value.


a race may drop


(defn keep-piping [source consumers]
  (async/go-loop [consumer nil
                  msg nil]
    (cond (nil? consumer)
          (when-let [c (async/<! consumers)]
            (recur consumer msg))
          (nil? msg)
          (when-let [msg (async/<! source)]
            (recur consumer msg))
          (not (async/>! consumer msg))
          (recur nil msg))))


takes a source channel, and a channel to read consumer channels off of, and pipes from the source the first consumer, then the next, etc

Ben Grabow23:05:03

hmm, interesting!

Ben Grabow23:05:23

I think that would work well for me if the component that was connecting consumer to producer knew about all the consumers. Right now my consumers are ignorant of each other, and my producer is ignorant of the consumers, and I have no code that is aware of everyone and responsible for doing the connecting.


if mult or pipe worked, then you do have some central place, you just don't think of it like that

Ben Grabow23:05:41

yep I think you're right

Ben Grabow23:05:53

well, my producer is creating the mult

Ben Grabow23:05:58

and my consumers are creating taps

Ben Grabow23:05:45

the consumer is initiating the action when a tap is created but the tap IS centrally managed in the mult like you say