I have a pipeline that uses go "workers" (go loops) to process some tasks. I want to increase parallelism and it seems like Clojure go-based parallelism is limited at 1024 (put buffer limit). Is there a way to increase that number? I'm not getting backpressure but there may be times where more than 1024 concurrent puts from a single machine. Changing from go-loops to other methods is a big change for me so I'm specifically interested at increasing the put buffer (safely) if possible. Thanks.
I was referring to the put! side. As far as I know, put! only hits the 1024 issue because of unmanaged backpressure.
I don't understand what "unmanaged backpressure" can mean without taking into account the requirements from a system, But it also doesn't matter since my question is if there is a way to allow more pending operations. It's not about how to manage backpressure. Assuming at least some people have seen this question already and they they have much more knowledge than I have - I assume there isn't a simple/clean way to do it. So I'll use one of the hacky solutions I have.
“Unmanaged backpressure” would be blindly calling put! without considering if the call will block. If there is buffer space, the call will not block. If there is not buffer space, the call would have blocked. However, if you ignore this and keep putting more data in, you can only do this 1024 times before an error is thrown.
I don't mind it blocking. It's not a problem. I mind the error.
If you don’t mind blocking, then use >!! instead of put!. You will not get the error on send. But you said your problem is receiving not sending
For that, perhaps try the pipeline-xx family for parallelization, rather than n go routines
the problem is on both ends since I want to use more than 1024 to pass messages from one channel to another and it can error on 1024 pending ops on both ends
So, use pipeline-xx
I'll have a look, do you know of any example? Or just the docs should do?
Docs are good
It seems like it's not using different channels so I suspect it will have the same issue for me. It also spawns a fixed number of go-loop "workers" (probably for the same reason)
maybe…ive not tried to go higher than 1024 consumers
I have hit the > 1024 producers, which was what your OP was seemingly about. Apologies if this was not helpful
at a high level though, if doing parallelization against a core.async channel, pipeline-xx is the way to go
I've attached the channel source code where the limitation is hard-coded so I don't think facilities around the channel will help for this matter. The channel implementation is where the limit is. I can work around it but it was just odd for me to find it in code with no way of setting it like the go thread pool size.
@ghaskins I agree and I like it better than what I have, but I can't use it due to limitations
just an example of hitting the same issue (expected):
(comment
(let [parallelism 1030
message-count 2000
source (clojure.core.async/chan message-count)
target (clojure.core.async/chan 1)]
(doseq [x (range message-count)]
(clojure.core.async/>!! source x))
(clojure.core.async/pipeline-async parallelism source identity target)))You should be able to use put! here as well unless I'm misunderstand something. Because your buffer is big enough, all put! will be instant and the put! queue will be empty. Now the target channel needs to have a bigger buffer or it'll back pressure and you'll hit the error again. Why are you making target buffer 1 I'm curious?
pipeline-async is extremely easy to get wrong
e.g. identity there is not a valid af argument, it needs to take two args
the "parallelism" argument there isn't really parallelism, pipeline-async doesn't spin up threads to run in parallel
parallelism for pipeline-async determines how many go blocks in spins up to copy from a to b, basically
@hiredman agreed…i meant concurrency where I said parallelism above
just misspoke
and go blocks are not really a parallelism construct (in the sense of golang just being released and all the talks that were given about concurrency vs. parllelism)
by default the threadpool that services go blocks is limitted to 8 threads, you can adjust this, but the reason it is 8 threads is 1 thread to run all the go block call backs and stuff and 7 more just in case you do some stuff on the go threadpool that you really shouldn't and some of the threads are blocked
I thought the default is 42 now
Hum, what am I thinking of then that was increased
Ok, so I think the issue is that, when you have more than 1024 takers, like with a n bigger than 1024 in pipeline or pipeline async, you basically end up with too many go-blocks calling take on the channel and parking because they can't be saturated. Like it's true, you can't have more than 1024 pending takes on a channel, the solution would be to not have them be pending, but it'll be pretty tricky to make sure there is a constant stream and that it never goes under.
Conclusion, I don't think you can have more than 1024 consumers per channel at a time.
Unless you do something like:
(alter-var-root #'impl/MAX-QUEUE-SIZE (constantly 3000))
(require '[clojure.core.async :as a])
And you also need to make sure you're not AOT compiling your app, cause MAX-QUEUE-SIZE is set to const, so the var would get removed if you didBut... I think maybe it would be possible to have some kind of dynamic sizing. Using (count (.buf ch)) and if the channel contains more than 1024, you can spawn more go blocks, just not go-loop, so they would consume a single element and close themselves.
It seems to me that your fundamental problem is you are using put!, which is async, vs something like >!!, which is sync. If you want to do that, you have to manage back pressure to avoid the 1024 limit.
So try setting your channel buffer to what you want for queue depth, and then just switch to a blocking put like >! or >!! and that might be all you need
Enqueing more puts won't increase parallelism, or am I missing something? Queued puts means that you're not processing them fast enough. Wouldn't you want to increase your takers to increase parallelism?
Right. The only way I am aware of to run afoul of the 1024 put! limit is because the channel is backpressured. So you either need to increase the consumption throughput or handle the back pressure
Yes, or if you want to enqueue tasks above 1024, I believe you should use a buffered channel and set a bigger buffer. But maybe pipeline-async doesn't give you control of that? I think if they meant they want higher concurrency, not parallelism, then I can see them wanting to have say 4000 concurrent operations even if they're all backed up, maybe you can start 4000 ops before the first one completes and that's fine for your use-case. But I think using put! is wrong then, you need to explicitly design that by having buffered channel.
This example shows more than 1024 workers waiting for work on a channel and failing:
(comment
(let [c (clojure.core.async/chan 9999)]
(doseq [_ (range 1030)]
(async/go
(async/
No more than 1024 pending takes are allowed, which is hard-coded https://github.com/clojure/core.async/blob/aa6b951301fbdcf5a13cdaaecb4b1a908dc8a978/src/main/clojure/clojure/core/async/impl/channels.clj#L239 and https://github.com/clojure/core.async/blob/aa6b951301fbdcf5a13cdaaecb4b1a908dc8a978/src/main/clojure/clojure/core/async/impl/channels.clj#L158 to my understanding.
I am starting my workers with go-loop so there are a fixed number of workers running (it saves up go block creation overhead).
This has nothing to do with not handling backpressure (there is zero backpressure in this example), or buffer size (buffer is empty in this example).
I want to be able to "safely" run more than 1024 workers (producers or consumers) on the same channel. This means that at times I will have more than 1024 puts or takes on the same channel.I tried alter-var-root which didn't work:
(comment
(do
(alter-var-root #'clojure.core.async.impl.protocols/MAX-QUEUE-SIZE (constantly 2048))
(let [c (async/chan 2)]
(doseq [i (range 2000)]
(async/thread (async/>!! c i))))))
I see the same problem exists in consuming from channels, so I wonder how to increase parallelism on both ends and allow for more parallel go blocks to run.
it's not, the pending take/put queue size on a channel is 1024. If you size your channel properly or apply backpressure you wouldn't hit that error.
the buffer size is totally separate from the error you see
I want to have more than 1024 pending puts and takes since I have more than 1024 go loops working on transferring items between channels
It is not always the case, but is very likely that if you are hitting the 1024 limit that you do have some kind of back pressure leak, which if you are using something like pipeline-async is very easy to do
So if you haven't, I would take hitting the 1024 limit as a warning and check your code to make sure you are not spinning up go loops in an inbound way
For pipelines, internally they create channels with buffer sizes equal to the parallelism amount
When a put goes in the buffer it isn't counted as pending
I understand all of that and I still want to allow more pending puts and takes For now I think I will override the implementation