Fork me on GitHub
#core-async
<
2024-04-05
>
stopa20:04:17

I wanted to play with a batch-available function. The idea: • It blocks until it batches up min items, but then tries to drain up to max items • To drain up to max items, I thought I could use the poll! function Here's what I got so far:

(defn- poll-times! [from n]
  (keep (fn [_] (a/poll! from)) (range 0 n)))

(defn batch-available
  "Batches `from` onto `to`. 
   One batch is _at least_ `min` items, and at _most_ max items."
  [from to min max]
  (a/go-loop
   [items []]
    (if (>= (count items) min)
      (do (a/>! to
                (into items (poll-times! from (- max (count items)))))
          (recur []))
      (let [item (a/!! from 1)
  (a/>!! from 2)
  (a/>!! from 3)
  (a/>!! from 4)
  (a/>!! from 5)

  (a/close! from))
It looks like my poll-times! is not doing what I expect at all though:
; eval (current-form): (a/>!! from 1)
true
; (out) to:  [1]
; --------------------------------------------------------------------------------
; eval (current-form): (a/>!! from 2)
true
; --------------------------------------------------------------------------------
; eval (current-form): (a/>!! from 3)
true
; --------------------------------------------------------------------------------
; eval (current-form): (a/>!! from 4)
true
; --------------------------------------------------------------------------------
; eval (current-form): (a/>!! from 5)
true
; (out) to:  [2]
; (out) to:  [3]
; (out) to:  [4 5]
I think I'm missing something obvious -- all pointers very welcome!

hiredman21:04:46

Add printlns (logging) into the loop in batch available. You have multiple threads of execution (at least 2 go loops and the repl thread) and you'll want to see where they are in relation to each other at any given time when debugging

hiredman21:04:08

It's not clear which part of what you are taking issue with, you have a comment showing a starting of your batch-available loop, and the code in that comment doesn't really match the output shown (there is a take from from in the middle) so not clear what parameters you started batch-available with, and it not clear what you expected?

hiredman21:04:50

What repl are you using? Depending you might be getting output(printing) batched to some degree

stopa21:04:53

Thanks @U0NCTKEV8! To clarify what I was expecting: I >!! 1 into from This printed

to: [1] 
Then while the go-loop was sleeping for 5 seconds, I >!! 2, 3, 4, 5 I expected a println of
to: [2, 3, 4, 5]
But I instead got separate printlns:
to: [2]
to: [3]
to: [4, 5]
I am using conjure on top of an nrepl --- Will continnue debugging and printlnin

hiredman21:04:23

Nrepl buffers output to some degree

hiredman21:04:07

So the [2] was likely printed right after you put 2, [3] right after you put 3, and then for whatever reason the put of 5 happened fast enough after the put of 4 so they got batched together

hiredman21:04:20

The printing gets captured as an output buffer that nrepl according to some condition flushes as an out message to the nrepl client, then the client prints it

hiredman21:04:53

My suggestion would be to base the batching on a time window, using alts and a timeout channel, because when it comes down to describing the behavior you want for this kind of batching you always end up needing time

👍 1
stopa14:04:25

Thank you @U0NCTKEV8! Ended up going with a more basic approach, since we needed 'at min one':

(defn- poll-times! [from n]
  (keep (fn [_] (a/poll! from)) (range 0 n)))

(defn take-available!! [ch max]
  (if-some [item (a/
Will keep this in mind when we need more comprehensive batching