hello! i think i'm running into an understanding issue with core.async and was hoping someone had some insight i am missing.
i have a channel with a fixed buffer defined like (def ch (async/chan 100)) and then i have a single thread consuming from kafka and putting records onto the channel like (async/>!! ch some-record). The ch channel is being consumed by other threads however i am triggering an error which sounds like >!! is not blocking how i understand it to work.
java.lang.AssertionError: Assert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer.
❯ clj -A:async
Clojure 1.12.0
user=> (require '[clojure.core.async :as a])
user=> (let [c (a/chan 100)] (dotimes [n 1200] (println n (a/>!! c :value))))
0 true
1 true
2 true
...
97 true
98 true
99 true
and now it’s blocked there. I’m unable to keep putting items onto the channelHow do you know it is a single thread publishing to the channel?
Like are you using some API to pull a single message from Kafka, use >!! To put on the channel, then loop?
Effectively yes, I have a Kafka consumer record moving through a pedestal interceptor chain and it must finish before processing the next record.
I should confirm it’s not going async, but in the case of a ton of threads executing >!!, they should all block right? Or would that trigger this error?
That would trigger this error
Ah ok! I will double check I’m not accidentally triggering async execution in Pedestal then
The limit is on the number of pending operations, it doesn't distinguish between "parking" and "blocking"
Ooo thanks that’s helpful
i’m doing a simulation and it doesn’t line up with that?
user=> (let [c (a/chan 6)] (dotimes [t 10] (a/thread (dotimes [n 1200] (println "thread: " t " item: " n (a/>!! c :value))))))
nil
user=> thread: thread: thread: thread: thread: thread: 5 9 item: item: 0 4 true item:
0 item: 0 true
8 item: 0 true
1 0 0true
true item: 0 true
i’m getting a channel with a buffer of 6 and spawning 10 threads to shove 1200 items onto it. They all block once 6 items have been placed on it.
Doesn’t that stand to reason that the (def ch (async/chan 100)) from the original question is not the thread that is getting 1024+ items on it?You are misunderstanding
While it is true that >!! will block a single thread to stop runaway queueing of pending operations, that assumes that you have a single thread
that’s why i started 10 threads each trying to put onto the channel and still see that it blocks
The most straightforward explanation for hitting the 1024 pending put limit is you have 1024 threads all using >!!
With 10 threads and proper back pressure, the most pending puts you'll have is 10
(proper back pressure here is assumed to come from >!! blocking the thread from continuing to put)
(let [c (a/chan 100)] (dotimes [t 1400] (a/thread (dotimes [n 1200] (println "thread: " t " item: " n (a/>!! c :value))))))
Exception in thread "async-thread-macro-1026" Exception in thread "async-thread-macro-1055" Exception in thread "async-thread-macro-1033" Exception in thread "async-thread-macro-1047" Exception in thread "async-thread-macro-1046" java.lang.AssertionError: Assert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed bufferright you are! I see. My little experiment wasn’t thorough enough. thanks
Both >! and >!! end up using put! under the hood. They both perform a "put!" attempt, and if the channel is full or no consumer are ready to take a value, they enqueue the put! until the channel can accept it or a consumer is ready to take from it. The difference is that >! will enqueue and park. While >!! will enqueue and block.
The 1024 put! limit means that no more than 1024 threads or go-block can be waiting on a channel. If that happens, you're likely doing something wrong. Because that's a lot of things waiting to be put on a channel. Either you're putting too much, or you're not taking enough. But if you actually want things to be queued up more than 1024, then you should increase the buffer on the channel.
thanks all, this has been helpful! i'm trying to figure out exactly why i have so many threads spinning up, but i did see the thread leaking in datadog. as an experiment, i've wrapped this code in a semaphore and it solves the issue in quite a blunt way, handling about 11k records/s without hitting that error.