core-async

jmv 2024-11-21T22:24:01.947869Z

hello! i think i'm running into an understanding issue with core.async and was hoping someone had some insight i am missing. i have a channel with a fixed buffer defined like (def ch (async/chan 100)) and then i have a single thread consuming from kafka and putting records onto the channel like (async/>!! ch some-record). The ch channel is being consumed by other threads however i am triggering an error which sounds like >!! is not blocking how i understand it to work.

java.lang.AssertionError: Assert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer. 

dpsutton 2024-11-21T22:33:46.851499Z

❯ clj -A:async
Clojure 1.12.0
user=> (require '[clojure.core.async :as a])
user=> (let [c (a/chan 100)] (dotimes [n 1200] (println n (a/>!! c :value))))
0 true
1 true
2 true
...
97 true
98 true
99 true
and now it’s blocked there. I’m unable to keep putting items onto the channel

2024-11-21T22:33:47.312629Z

How do you know it is a single thread publishing to the channel?

2024-11-21T22:35:51.804039Z

Like are you using some API to pull a single message from Kafka, use >!! To put on the channel, then loop?

jmv 2024-11-21T22:37:59.753159Z

Effectively yes, I have a Kafka consumer record moving through a pedestal interceptor chain and it must finish before processing the next record.

jmv 2024-11-21T22:39:18.844009Z

I should confirm it’s not going async, but in the case of a ton of threads executing >!!, they should all block right? Or would that trigger this error?

2024-11-21T22:39:35.474979Z

That would trigger this error

jmv 2024-11-21T22:40:37.778719Z

Ah ok! I will double check I’m not accidentally triggering async execution in Pedestal then

2024-11-21T22:40:44.298359Z

The limit is on the number of pending operations, it doesn't distinguish between "parking" and "blocking"

jmv 2024-11-21T22:41:12.812209Z

Ooo thanks that’s helpful

dpsutton 2024-11-21T22:43:18.939439Z

i’m doing a simulation and it doesn’t line up with that?

user=> (let [c (a/chan 6)] (dotimes [t 10] (a/thread (dotimes [n 1200] (println "thread: " t " item: " n (a/>!! c :value))))))
nil
user=> thread: thread: thread:   thread:  thread:   thread:  5 9  item:  item:  0 4 true item:
0  item:  0 true
 8  item:  0 true
1  0 0true
 true item:  0 true
i’m getting a channel with a buffer of 6 and spawning 10 threads to shove 1200 items onto it. They all block once 6 items have been placed on it. Doesn’t that stand to reason that the (def ch (async/chan 100)) from the original question is not the thread that is getting 1024+ items on it?

2024-11-21T22:45:23.301279Z

You are misunderstanding

2024-11-21T22:46:18.328319Z

While it is true that >!! will block a single thread to stop runaway queueing of pending operations, that assumes that you have a single thread

dpsutton 2024-11-21T22:47:08.837659Z

that’s why i started 10 threads each trying to put onto the channel and still see that it blocks

2024-11-21T22:47:25.141819Z

The most straightforward explanation for hitting the 1024 pending put limit is you have 1024 threads all using >!!

2024-11-21T22:49:13.432859Z

With 10 threads and proper back pressure, the most pending puts you'll have is 10

2024-11-21T22:50:33.147959Z

(proper back pressure here is assumed to come from >!! blocking the thread from continuing to put)

dpsutton 2024-11-21T22:50:48.113339Z

(let [c (a/chan 100)] (dotimes [t 1400] (a/thread (dotimes [n 1200] (println "thread: " t " item: " n (a/>!! c :value))))))
Exception in thread "async-thread-macro-1026" Exception in thread "async-thread-macro-1055" Exception in thread "async-thread-macro-1033" Exception in thread "async-thread-macro-1047" Exception in thread "async-thread-macro-1046" java.lang.AssertionError: Assert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer

dpsutton 2024-11-21T22:51:04.460929Z

right you are! I see. My little experiment wasn’t thorough enough. thanks

2024-11-21T23:30:45.866829Z

Both >! and >!! end up using put! under the hood. They both perform a "put!" attempt, and if the channel is full or no consumer are ready to take a value, they enqueue the put! until the channel can accept it or a consumer is ready to take from it. The difference is that >! will enqueue and park. While >!! will enqueue and block.

2024-11-21T23:38:54.056069Z

The 1024 put! limit means that no more than 1024 threads or go-block can be waiting on a channel. If that happens, you're likely doing something wrong. Because that's a lot of things waiting to be put on a channel. Either you're putting too much, or you're not taking enough. But if you actually want things to be queued up more than 1024, then you should increase the buffer on the channel.

jmv 2024-11-22T02:12:07.102319Z

thanks all, this has been helpful! i'm trying to figure out exactly why i have so many threads spinning up, but i did see the thread leaking in datadog. as an experiment, i've wrapped this code in a semaphore and it solves the issue in quite a blunt way, handling about 11k records/s without hitting that error.