Fork me on GitHub
#core-async
<
2022-04-23
>
benny18:04:09

is there a way to drain a channel? specifically for testing

didibus18:04:42

As in, take until it blocks? But don't block?

didibus18:04:50

Or take until it closes?

benny18:04:00

take until it blocks or closes

benny18:04:10

blocks probably would be ideal

benny18:04:58

the code being tested will publish a known number of messages to the channel but trying to avoid the looping and simplify the code in the test

didibus18:04:36

You can use poll! and just poll! until it returns nil

benny18:04:37

that should do it! thanks!

didibus19:04:42

I think it's possibly not perfect, because as you poll! it is technically possible that you polled the last thing pending in the channel, but by the time the next iteration calls poll again, something was delivered to the channel.

benny19:04:43

right, there is, technically, a race condition right?

didibus19:04:46

So there's a small time window where poll! could have failed, but because something got concurrently delivered it'll succeed another round.

didibus19:04:44

Like if I run this:

(def chan (a/chan 100))
(def ccount (atom 0))
(a/go
  (dotimes [i 300]
    (a/>! chan i)))
(while (a/poll! chan)
  (swap! ccount inc))
(println @ccount)
It doesn't always print 100 perfectly. Sometimes it is a bit more. Probably because the go block added more things when poll! polled the 100ed item, and by the time poll! was call again there was now more things to poll.

didibus19:04:37

The thing is, you can't lock the channel 😛 that's kind of the whole point of core.async. So I'm not sure if there's a better way.

didibus19:04:09

There might be an interop into the implementation that could return the buffer count

didibus19:04:56

Seems you can do this:

(count (.-buf chan))

didibus19:04:12

And it'll just tell you how many things are in the channel's buffer

didibus19:04:44

(.full? (.-buf chan))
This tells you if it is full. So in your test, you could check until it is full?, and then get the count to see if it is the count you expect.

didibus19:04:21

It won't be thread safe, but I think for your use case it is fine.

didibus19:04:26

(do
  (def chan (a/chan 100))
  (a/go
    (dotimes [i 300]
      (a/>! chan i)))
  (while (not (.full? (.-buf chan))))
  (println (count (.-buf chan))))
Always gives me 100 exactly

Ben Sless02:04:36

Use async/reduce and collect everything that way. It will return a go block

benny04:04:54

it seems like the first poll on the chan is nil so it always fails 😕

Ben Sless05:04:01

@U4ZJ5UHQD don't use poll in this case, use reduce. Reduce is always the way to drain/consume a source. It respects channel semantics, blocking and time. Poll doesn't

didibus05:04:56

Reduce will drain till close though, not till the first yield

didibus05:04:32

> ch must close before reduce produces a result

didibus05:04:12

I guess it depends what the ask is. But I understood it as, can I test that I have delivered what I expected to the channel buffer till first park/block due to the buffer being full.

Ben Sless05:04:51

If it's to first yield then it's not a drain. Just do a blocking take

Ben Sless05:04:16

Besides, producer should close channel on finish