Fork me on GitHub

I need to use a chan that expects n values put on it then closes. n will usually be small, less than 10, but in certain scenarios will be a few thousand messages that come in a few milliseconds, which I've found is impossible to for any consumers to keep up with before hitting the pending put! limit. What's people's opinion on the best way to address this? Just use a fixed buffer of a number large enough to definitely account for all cases of n? From what I gather from quickly skimming the async source, a fixed buffer doesn't actually allocate anything, so having a very liberal fixed buffer for chan of say 1million even when most of the time it will only buffer one or two items tops wouldn't actually be a big inefficiency right?


Alternatively I could implement an unlimited buffer, but I'm thinking that would probably be worse


or a dropping-buffer, or a sliding-buffer


but if you know ahead of time a fixed number of messages will be put on a channel, it is often best to create the channel with a buffer that size


a very common case of this is, if you know you will only ever put one item to a channel, create the channel with buffer size 1, you'll see this used in a number of places in the core.async source


But if you are hitting the limit on puts, that is usually an indication of a bad pattern


i can't lose data so sliding/dropping buffers are out


and sometimes i can know the exact number of expected messages but there will be times when I can guestimate at best


are you using put! ?


if you are on clojure, switching to >!! will likely superficially solve your problem


Actually I belive I tried that but still hit the pending puts error


because it's like 10k messages coming in basically "simultaneously"


coming in from where?


external api that pushes data to me


ok, so you need to communicate backpressure to that


but what if there's no mechanism for it to "slow down"


then you need an infinite buffer


which core.async purposefully doesn't give you


I think I can get fancy, and when possible choose an exact buffer size needed, or in the case of the large burst messages, choose the minimal buffer size that allows the messages to be kept up with. But yeah I was just wondering if there's a reason to implement the infinite buffer instead of just choosing a fixed buffer size that's effectively infinite?


there almost certainly is some kind of backpressure mechanism in whatever way the api is pushing data to you


tcp has one built in


Are you saying the infinite buffer is preferable because the fact that a fixed buffer doesn't currently allocate anything proportional to its buffer size is just an implementation detail and may change in the future?


but using put! without a callback, or something else, can break the chain of backpressure

Alex Miller (Clojure team)19:08:17

the buffer you're hitting is not the channel buffer though and is not configurable

Alex Miller (Clojure team)19:08:09

if I understand correctly


yeah, I get that there's no way around that, I need to either implement a unlimited buffer or chose an effectively infinite fixed buffer size


just wondering which of those is better


it's for a wrapper of a java stock broker api, most of the request types return a single message, but you can request historical data which can return like 10k messages, and unfortunately they all come in one burst and the api provides no mechanism of control for this


core.async isn't a perfect fit for this use case I know but currently I'm using my own hand rolled combo of atoms and promises and I'm starting to thing bending core.async to my will would be better than managing all the concurrency myself


but somewhere you have a single bit of code that is taking those messages and putting them into a channel


and you are losing the backpress right there, you should not move on to put in the next thing until the current thing has been accepted


ah! this makes sense. so maybe doing something like put!ing the next item in the callback of the first put!


yes, or use >!! which will block the real java thread until the input is accepted


ah ok cool. I was pretty sure that I tried both >!! and put! and was getting the pending puts exceeded error message either way, but I don't have the example immediately in front of me


using >!! will block that single thread, so if you have a single loop taking messages and putting them in to a channel that will work, if you have multiple concurrent places that is happening you can still run into the issue with >!!


gotcha, thanks for all the help


for simple cases, using >!! will often solve backpressure isssues, for more complicated cases put! with a callback, and of course a lot of example docs show using put! without a callback which is asking for a backpress issue


> which is asking for a backpress issue Do you mean put! without a callback is asking for issues caused by not exerting backpressure? i.e. you’re at the mercy of the system incidentally rather than explicitly. i.e. enforcing backpressure through a lack of resources, such as through the 1024 limit, memory leaks due to excessive spawned go blocks etc?


gotcha yeah that's good to know


I don't think I've ever used the put! callback before


offer! also allows to check if put was successful or not, in a non blocking way


You re hitting the 1024 pending put limit per channel. As others mentioned you need to be able to signal backpressure upstream or use something like credit-based flow control or rate limiting.