Fork me on GitHub
#core-async
<
2019-12-19
>
erwinrooijakkers10:12:26

What is the best way to read from a channel till a certain condition is met (e.g., 2000 elements taken)?

bortexz10:12:32

You can apply a transducer to that channel, in this case (take 2000)

fmjrey11:12:08

also curious to know if there is a definite answer yet

yonatanel13:12:41

The pipe go-loop is taking from one channel and puts on the other until either channel is closed. The first put of 0 closes the second channel, but it is technically successful (the put succeeded with no items), so pipe iterates one more time, takes 1 and then encounters the closed channel and quits, but the 1 is already out of the source channel.

fmjrey14:12:02

Great explanation, thanks! When I tried this variant I got non deterministic results: https://clojurians.slack.com/archives/C05423W6H/p1576247793020000?thread_ts=1576243305.010900&amp;cid=C05423W6H

fmjrey14:12:00

So I suppose there are some concurrency aspects to it as well.

yonatanel16:12:41

It’s a race between your take and the internal pipe take. One of you will get the 1 and become a funk king.

fmjrey17:12:30

Makes sense, which means that race condition is also present in the original example given by @U66G3SGP5. Adding a few log/locking/println statements in my version did the trick to reveal the race condition.

roklenarcic19:12:27

Of course there’s a race condition there, that is intentional, after creating pipe I read from source of that pipe directly. The point of that snippet wasn’t to show working code, it was to show an additional item was read from the source channel

erwinrooijakkers12:12:50

Thanks I saw that one yes

erwinrooijakkers12:12:21

But have had no time to test yet

erwinrooijakkers12:12:30

I really want 2000 messages and only then continue

erwinrooijakkers12:12:02

transducer that does (take 2000)

erwinrooijakkers12:12:15

Would be perfect

erwinrooijakkers12:12:23

I’ll check this evening 🙂

mpenet17:12:30

playing with core.async:

(def c (async/chan (async/buffer 0) x))
(future (async/>!! c 1))
(async/<!! c) ;; returns  1

mpenet17:12:25

I know you're supposed to have an actual non empty buffer for xform to work, but I was suprised we didnt' get an early throw in that case (at channel creation time)

mpenet17:12:00

x is (map inc)

Alex Miller (Clojure team)17:12:28

(async/chan 0) will throw

Alex Miller (Clojure team)17:12:01

prob just an oversight that (async/buffer 0) doesn't (far more common to create fixed buffers directly in chan)

Alex Miller (Clojure team)17:12:15

yeah, that's just badly placed assertion, will fix

mpenet17:12:24

I mean it sort of work as an unbuffered channel with (buffer 0)

mpenet17:12:04

yes, hence the "sort of"

mpenet17:12:24

implementation for unbuffered channel is probably quite smart

mpenet17:12:11

:thumbsup: