Fork me on GitHub
#core-async
<
2019-08-28
>
jjttjj19:08:50

I need to use a chan that expects n values put on it then closes. n will usually be small, less than 10, but in certain scenarios will be a few thousand messages that come in a few milliseconds, which I've found is impossible to for any consumers to keep up with before hitting the pending put! limit. What's people's opinion on the best way to address this? Just use a fixed buffer of a number large enough to definitely account for all cases of n? From what I gather from quickly skimming the async source, a fixed buffer doesn't actually allocate anything, so having a very liberal fixed buffer for chan of say 1million even when most of the time it will only buffer one or two items tops wouldn't actually be a big inefficiency right?

jjttjj19:08:41

Alternatively I could implement an unlimited buffer, but I'm thinking that would probably be worse

hiredman19:08:50

or a dropping-buffer, or a sliding-buffer

hiredman19:08:36

but if you know ahead of time a fixed number of messages will be put on a channel, it is often best to create the channel with a buffer that size

hiredman19:08:42

a very common case of this is, if you know you will only ever put one item to a channel, create the channel with buffer size 1, you'll see this used in a number of places in the core.async source

hiredman19:08:43

But if you are hitting the limit on puts, that is usually an indication of a bad pattern

jjttjj19:08:14

i can't lose data so sliding/dropping buffers are out

jjttjj19:08:49

and sometimes i can know the exact number of expected messages but there will be times when I can guestimate at best

hiredman19:08:16

are you using put! ?

hiredman19:08:56

if you are on clojure, switching to >!! will likely superficially solve your problem

jjttjj19:08:18

Actually I belive I tried that but still hit the pending puts error

jjttjj19:08:47

because it's like 10k messages coming in basically "simultaneously"

hiredman19:08:03

coming in from where?

jjttjj19:08:20

external api that pushes data to me

hiredman19:08:34

ok, so you need to communicate backpressure to that

jjttjj19:08:51

but what if there's no mechanism for it to "slow down"

hiredman19:08:33

then you need an infinite buffer

hiredman19:08:42

which core.async purposefully doesn't give you

jjttjj19:08:38

I think I can get fancy, and when possible choose an exact buffer size needed, or in the case of the large burst messages, choose the minimal buffer size that allows the messages to be kept up with. But yeah I was just wondering if there's a reason to implement the infinite buffer instead of just choosing a fixed buffer size that's effectively infinite?

hiredman19:08:21

there almost certainly is some kind of backpressure mechanism in whatever way the api is pushing data to you

hiredman19:08:32

tcp has one built in

jjttjj19:08:08

Are you saying the infinite buffer is preferable because the fact that a fixed buffer doesn't currently allocate anything proportional to its buffer size is just an implementation detail and may change in the future?

hiredman19:08:13

but using put! without a callback, or something else, can break the chain of backpressure

Alex Miller (Clojure team)19:08:17

the buffer you're hitting is not the channel buffer though and is not configurable

Alex Miller (Clojure team)19:08:09

if I understand correctly

jjttjj19:08:16

yeah, I get that there's no way around that, I need to either implement a unlimited buffer or chose an effectively infinite fixed buffer size

jjttjj19:08:23

just wondering which of those is better

jjttjj19:08:29

it's for a wrapper of a java stock broker api, most of the request types return a single message, but you can request historical data which can return like 10k messages, and unfortunately they all come in one burst and the api provides no mechanism of control for this

jjttjj19:08:40

core.async isn't a perfect fit for this use case I know but currently I'm using my own hand rolled combo of atoms and promises and I'm starting to thing bending core.async to my will would be better than managing all the concurrency myself

hiredman19:08:52

but somewhere you have a single bit of code that is taking those messages and putting them into a channel

hiredman19:08:24

and you are losing the backpress right there, you should not move on to put in the next thing until the current thing has been accepted

jjttjj19:08:08

ah! this makes sense. so maybe doing something like put!ing the next item in the callback of the first put!

hiredman19:08:12

yes, or use >!! which will block the real java thread until the input is accepted

jjttjj19:08:34

ah ok cool. I was pretty sure that I tried both >!! and put! and was getting the pending puts exceeded error message either way, but I don't have the example immediately in front of me

hiredman19:08:29

using >!! will block that single thread, so if you have a single loop taking messages and putting them in to a channel that will work, if you have multiple concurrent places that is happening you can still run into the issue with >!!

jjttjj19:08:20

gotcha, thanks for all the help

hiredman19:08:14

for simple cases, using >!! will often solve backpressure isssues, for more complicated cases put! with a callback, and of course a lot of example docs show using put! without a callback which is asking for a backpress issue

rickmoynihan08:08:14

> which is asking for a backpress issue Do you mean put! without a callback is asking for issues caused by not exerting backpressure? i.e. you’re at the mercy of the system incidentally rather than explicitly. i.e. enforcing backpressure through a lack of resources, such as through the 1024 limit, memory leaks due to excessive spawned go blocks etc?

jjttjj19:08:44

gotcha yeah that's good to know

jjttjj19:08:19

I don't think I've ever used the put! callback before

mpenet20:08:41

offer! also allows to check if put was successful or not, in a non blocking way

mpenet20:08:02

You re hitting the 1024 pending put limit per channel. As others mentioned you need to be able to signal backpressure upstream or use something like credit-based flow control or rate limiting.