Fork me on GitHub
#core-async
<
2017-01-31
>
nooga20:01:25

I’m getting a stream of values from a chan, I want to get them in batches of n but I don’t want to block if there’s less than n in chan's buffer and get all available instead

nooga20:01:51

how do I do that with core.async?

nooga20:01:03

but then, I’d like it to block if there are no values available 🙂

joshjones20:01:23

@nooga since you want to do this in batches, you’ll have to roll your own (very simple I would imagine) function that loops using poll! as @ghosss suggested, and then if it gives nil (meaning there’s no values), then just call <!! — maybe there’s already a “take n items” function but I can’t find it.

ghosss20:01:47

something like this? (not thoroughly tested):

(defn nooga-take* [n ch res]
  (if (>= 0 n)
    res
    (if-let [val (poll! ch)]
      (recur (dec n) ch (conj res val))
      res)))

(defn nooga-take [n ch]
  (if-let [val (or (poll! ch) (<!! ch))]
    (nooga-take* (dec n) ch [val])
    nil))

joshjones21:01:04

looks about right @ghosss — but @nooga — regarding how this works: if you block on an empty channel, do you only want to return the result when n items are in the channel, or when the first item becomes available?

nooga21:01:04

when n items are in the channel

nooga21:01:27

thanks @ghosss! I was just rolling my own

nooga21:01:19

The situation is following: I’ve got lists of values coming from http clients, one or more item per request, I onto-chan the lists, and on the other side I’ve got a go block that inserts the items into a db

nooga21:01:27

the db likes batched inserts

nooga21:01:31

so I want to introdouce some kind of rate control and provide steady stream of batches

nooga21:01:57

on the other hand, I don’t want to loop while there’s nothing coming in

nooga21:01:25

or delay insertion when the traffic is low

joshjones21:01:56

so when the first item becomes available, you only want 1, and not n

nooga21:01:20

joshjones: All available at the moment

joshjones21:01:31

well, if you block on an empty channel, then “all available” will be when the first comes in, so you can only return one, unless you decide to implement a timeout and wait for a second to come in quickly, etc.

nooga21:01:53

But what if I spend, say 1s inserting the previous batch and during that my handlers wrote >1 items onto the channel?

joshjones21:01:19

If the channel is empty, you call get-n-from-chan, then it will block, correct?

nooga21:01:53

oh right!

joshjones21:01:08

so you have two choices: 1) block and return the first item that comes from the channel 2) block and return the first item plus as many or few as you want afterward. That policy is what you determine—the quickest return is, return when the first item becomes available. The slowest is, return when n items have been returned. Another option is to set a timeout that says, return the first and any additional within some timeout period

nooga21:01:39

I’ll experiment with that

ghosss21:01:45

yeah, I once wrote something similar. to take in batches of at most n values but also after some timeout, take as big of a batch as has been built up

donaldball21:01:00

oh, I have one of those too

nooga21:01:46

ztellman/manifold has this IIRC

tbaldridge21:01:22

@nooga then what people are suggesting is correct do a blocking take to get the first item, poll! to get more available items, then stop when you get max-items or get a nil from poll!, insert into the DB then do a blocking take again.

nooga22:01:19

@tbaldridge I’m testing this right now, so far looks like a good idea