core-async

Reut Sharabani 2024-04-24T17:13:21.579289Z

I have a pipeline that uses go "workers" (go loops) to process some tasks. I want to increase parallelism and it seems like Clojure go-based parallelism is limited at 1024 (put buffer limit). Is there a way to increase that number? I'm not getting backpressure but there may be times where more than 1024 concurrent puts from a single machine. Changing from go-loops to other methods is a big change for me so I'm specifically interested at increasing the put buffer (safely) if possible. Thanks.

ghaskins 2024-04-30T13:46:08.733349Z

I was referring to the put! side. As far as I know, put! only hits the 1024 issue because of unmanaged backpressure.

Reut Sharabani 2024-04-30T13:52:21.585619Z

I don't understand what "unmanaged backpressure" can mean without taking into account the requirements from a system, But it also doesn't matter since my question is if there is a way to allow more pending operations. It's not about how to manage backpressure. Assuming at least some people have seen this question already and they they have much more knowledge than I have - I assume there isn't a simple/clean way to do it. So I'll use one of the hacky solutions I have.

ghaskins 2024-04-30T13:54:48.145339Z

“Unmanaged backpressure” would be blindly calling put! without considering if the call will block. If there is buffer space, the call will not block. If there is not buffer space, the call would have blocked. However, if you ignore this and keep putting more data in, you can only do this 1024 times before an error is thrown.

Reut Sharabani 2024-04-30T13:55:25.825279Z

I don't mind it blocking. It's not a problem. I mind the error.

ghaskins 2024-04-30T13:56:36.036949Z

If you don’t mind blocking, then use >!! instead of put!. You will not get the error on send. But you said your problem is receiving not sending

ghaskins 2024-04-30T13:57:08.431049Z

For that, perhaps try the pipeline-xx family for parallelization, rather than n go routines

Reut Sharabani 2024-04-30T13:57:13.229559Z

the problem is on both ends since I want to use more than 1024 to pass messages from one channel to another and it can error on 1024 pending ops on both ends

ghaskins 2024-04-30T13:57:30.473629Z

So, use pipeline-xx

Reut Sharabani 2024-04-30T13:58:00.045719Z

I'll have a look, do you know of any example? Or just the docs should do?

ghaskins 2024-04-30T13:58:11.106459Z

Docs are good

Reut Sharabani 2024-04-30T14:13:44.442649Z

It seems like it's not using different channels so I suspect it will have the same issue for me. It also spawns a fixed number of go-loop "workers" (probably for the same reason)

ghaskins 2024-04-30T14:14:28.450259Z

maybe…ive not tried to go higher than 1024 consumers

ghaskins 2024-04-30T14:14:57.012329Z

I have hit the > 1024 producers, which was what your OP was seemingly about. Apologies if this was not helpful

ghaskins 2024-04-30T14:15:51.870969Z

at a high level though, if doing parallelization against a core.async channel, pipeline-xx is the way to go

Reut Sharabani 2024-04-30T14:16:25.798049Z

I've attached the channel source code where the limitation is hard-coded so I don't think facilities around the channel will help for this matter. The channel implementation is where the limit is. I can work around it but it was just odd for me to find it in code with no way of setting it like the go thread pool size.

Reut Sharabani 2024-04-30T14:16:48.749739Z

@ghaskins I agree and I like it better than what I have, but I can't use it due to limitations

Reut Sharabani 2024-04-30T14:27:49.211529Z

just an example of hitting the same issue (expected):

(comment
  (let [parallelism 1030
        message-count 2000
        source (clojure.core.async/chan message-count)
        target (clojure.core.async/chan 1)]
    (doseq [x (range message-count)]
      (clojure.core.async/>!! source x))
    (clojure.core.async/pipeline-async parallelism source identity target)))

2024-04-30T20:20:52.287129Z

You should be able to use put! here as well unless I'm misunderstand something. Because your buffer is big enough, all put! will be instant and the put! queue will be empty. Now the target channel needs to have a bigger buffer or it'll back pressure and you'll hit the error again. Why are you making target buffer 1 I'm curious?

2024-04-30T21:38:08.771509Z

pipeline-async is extremely easy to get wrong

2024-04-30T21:39:06.442009Z

e.g. identity there is not a valid af argument, it needs to take two args

2024-04-30T21:41:18.768239Z

the "parallelism" argument there isn't really parallelism, pipeline-async doesn't spin up threads to run in parallel

2024-04-30T21:42:26.382649Z

parallelism for pipeline-async determines how many go blocks in spins up to copy from a to b, basically

ghaskins 2024-04-30T21:43:47.852179Z

@hiredman agreed…i meant concurrency where I said parallelism above

ghaskins 2024-04-30T21:44:09.561669Z

just misspoke

2024-04-30T21:44:54.338909Z

and go blocks are not really a parallelism construct (in the sense of golang just being released and all the talks that were given about concurrency vs. parllelism)

2024-04-30T21:47:00.377499Z

by default the threadpool that services go blocks is limitted to 8 threads, you can adjust this, but the reason it is 8 threads is 1 thread to run all the go block call backs and stuff and 7 more just in case you do some stuff on the go threadpool that you really shouldn't and some of the threads are blocked

2024-04-30T22:44:33.472579Z

I thought the default is 42 now

2024-04-30T23:29:48.276449Z

Hum, what am I thinking of then that was increased

2024-04-30T23:50:58.173149Z

Ok, so I think the issue is that, when you have more than 1024 takers, like with a n bigger than 1024 in pipeline or pipeline async, you basically end up with too many go-blocks calling take on the channel and parking because they can't be saturated. Like it's true, you can't have more than 1024 pending takes on a channel, the solution would be to not have them be pending, but it'll be pretty tricky to make sure there is a constant stream and that it never goes under.

2024-05-01T00:55:12.776479Z

Conclusion, I don't think you can have more than 1024 consumers per channel at a time.

2024-05-01T01:01:15.623719Z

Unless you do something like:

(alter-var-root #'impl/MAX-QUEUE-SIZE (constantly 3000))
(require '[clojure.core.async :as a])
And you also need to make sure you're not AOT compiling your app, cause MAX-QUEUE-SIZE is set to const, so the var would get removed if you did

2024-05-01T01:05:52.160069Z

But... I think maybe it would be possible to have some kind of dynamic sizing. Using (count (.buf ch)) and if the channel contains more than 1024, you can spawn more go blocks, just not go-loop, so they would consume a single element and close themselves.

ghaskins 2024-04-29T12:12:52.739549Z

It seems to me that your fundamental problem is you are using put!, which is async, vs something like >!!, which is sync. If you want to do that, you have to manage back pressure to avoid the 1024 limit.

ghaskins 2024-04-29T12:17:58.633269Z

So try setting your channel buffer to what you want for queue depth, and then just switch to a blocking put like >! or >!! and that might be all you need

2024-04-29T19:45:45.945519Z

Enqueing more puts won't increase parallelism, or am I missing something? Queued puts means that you're not processing them fast enough. Wouldn't you want to increase your takers to increase parallelism?

ghaskins 2024-04-29T19:50:25.415879Z

Right. The only way I am aware of to run afoul of the 1024 put! limit is because the channel is backpressured. So you either need to increase the consumption throughput or handle the back pressure

2024-04-29T23:03:47.340199Z

Yes, or if you want to enqueue tasks above 1024, I believe you should use a buffered channel and set a bigger buffer. But maybe pipeline-async doesn't give you control of that? I think if they meant they want higher concurrency, not parallelism, then I can see them wanting to have say 4000 concurrent operations even if they're all backed up, maybe you can start 4000 ops before the first one completes and that's fine for your use-case. But I think using put! is wrong then, you need to explicitly design that by having buffered channel.

Reut Sharabani 2024-04-30T04:56:43.568269Z

This example shows more than 1024 workers waiting for work on a channel and failing:

(comment
  (let [c (clojure.core.async/chan 9999)]
    (doseq [_ (range 1030)]
      (async/go
        (async/
No more than 1024 pending takes are allowed, which is hard-coded https://github.com/clojure/core.async/blob/aa6b951301fbdcf5a13cdaaecb4b1a908dc8a978/src/main/clojure/clojure/core/async/impl/channels.clj#L239 and https://github.com/clojure/core.async/blob/aa6b951301fbdcf5a13cdaaecb4b1a908dc8a978/src/main/clojure/clojure/core/async/impl/channels.clj#L158 to my understanding. I am starting my workers with go-loop so there are a fixed number of workers running (it saves up go block creation overhead). This has nothing to do with not handling backpressure (there is zero backpressure in this example), or buffer size (buffer is empty in this example). I want to be able to "safely" run more than 1024 workers (producers or consumers) on the same channel. This means that at times I will have more than 1024 puts or takes on the same channel.

Reut Sharabani 2024-04-24T17:28:38.039669Z

I tried alter-var-root which didn't work:

(comment
  (do
    (alter-var-root #'clojure.core.async.impl.protocols/MAX-QUEUE-SIZE (constantly 2048))
    (let [c (async/chan 2)]
      (doseq [i (range 2000)]
        (async/thread (async/>!! c i))))))

Reut Sharabani 2024-04-24T17:44:04.575019Z

I see the same problem exists in consuming from channels, so I wonder how to increase parallelism on both ends and allow for more parallel go blocks to run.

mpenet 2024-04-24T18:50:26.878319Z

it's not, the pending take/put queue size on a channel is 1024. If you size your channel properly or apply backpressure you wouldn't hit that error.

mpenet 2024-04-24T18:51:48.331069Z

the buffer size is totally separate from the error you see

Reut Sharabani 2024-04-24T19:31:19.340679Z

I want to have more than 1024 pending puts and takes since I have more than 1024 go loops working on transferring items between channels

2024-04-24T19:57:49.548979Z

It is not always the case, but is very likely that if you are hitting the 1024 limit that you do have some kind of back pressure leak, which if you are using something like pipeline-async is very easy to do

2024-04-24T19:59:23.621139Z

So if you haven't, I would take hitting the 1024 limit as a warning and check your code to make sure you are not spinning up go loops in an inbound way

2024-04-24T20:01:49.872279Z

For pipelines, internally they create channels with buffer sizes equal to the parallelism amount

2024-04-24T20:02:36.997349Z

When a put goes in the buffer it isn't counted as pending

Reut Sharabani 2024-04-25T00:17:42.642189Z

I understand all of that and I still want to allow more pending puts and takes For now I think I will override the implementation