Fork me on GitHub
#core-async
<
2020-04-16
>
Alexis Vincent15:04:02

Is there a way to override the MAX-QUEUE-SIZE for a particular chan. I have a case where I create a promise-chan that resolves to some status when some computation finishes. And there are more then 1024 listeners who need to block until the status is finished. The amount of listeners scales with the complexity of our APP and could up to 5000 pending takes, and in another case scales with the amount of active connections in a partition, again is unlikely to go above 2000. I want to be able to set these limits on a per chan level. Not really a solution to set this globally (although still better), since this is an internal lib and I don’t want to pass on these limits to the app level. Its kind of ok for this to be some hack job with redefining MAX-QUEUE-SIZE with a patch. I’m kind of thinking a (binding [clojure.core.async.impl.protocols/MAX-QUEUE-SIZE 2000] …) or something similar is what im after, however clojure.core.async.impl.protocols/MAX-QUEUE-SIZE isnt dynamic so that wouldnt work. Any recomendations?

Alex Miller (Clojure team)16:04:13

This is not currently configurable and that’s intentional as it’s generally not something people should run into

Alex Miller (Clojure team)16:04:29

Perhaps you should consider using an actual promise?

Alexis Vincent16:04:53

The code is heavily reliant on core.async, and needs to be clojurescript compat. For this case I might need to work around the issue using mult’s or something similar. But this is a pattern that is widely used in the codebase. As far as I can tell, theres no reason an arbitary upper-limit should be set on takes of a chan when quantity is well known and finite?

Alexis Vincent16:04:25

I do understand the motivation behind capping it low in general though.

noisesmith16:04:41

it exposes subtle bugs that happen under load, earlier

Alexis Vincent16:04:31

It’s also something I wouldnt want to set globally in the software precisely because it might delay finding bugs when careless code is written

noisesmith16:04:31

the pedantic answer to the scenario is someone should be applying backpressure or using a proper queue somewhere

Alexis Vincent16:04:50

right. Which is almost always the case

Alexis Vincent16:04:03

But in this case, thats not whats happening

Alexis Vincent16:04:50

Back pressure is carefully accounted for in this system, however here we have a case where a finite known value needs to be used for pending takes, rather then in a buffer

Alexis Vincent16:04:01

Imagine this value was 5, you would only be able to have 5 things sync around this point.

Alexis Vincent16:04:23

For a promise-chan, this makes little sense

Alexis Vincent16:04:49

@alexmiller Thanks for the response. Is there perhaps a way to simply make a different chan that would have this property, without copying the entire impl for ManyToManyChannel.

Alex Miller (Clojure team)17:04:09

channels implement some protocols, so you could go that route, but it's probably not trivial if you want channels as generic as m2m

Alex Miller (Clojure team)17:04:30

but maybe for this narrow case it would work

Alex Miller (Clojure team)17:04:26

look at timeout - that's a channel impl

ghadi17:04:53

timeout reuses m2m

Alex Miller (Clojure team)17:04:16

ah, well the relevant protocols are Channel, ReadPort, and WritePort

Alexis Vincent17:04:49

awesome thanks. Will post results back here

ghadi17:04:11

it's kind of subtle how to handle things, I would probably for fixing your app with a mult before reimpl'ing chans

Alexis Vincent17:04:46

Either going to use a mult or try the impl route

Alex Miller (Clojure team)17:04:52

there are lots of facilities to do this kind of thing in java.util.concurrent in the jvm, but I don't know the range of options in cljs

Alexis Vincent17:04:13

its mainly a case of having a heavy core.async lib (virtual connection multiplexer), and wanting to use the primitives we have to interact with promise like things

Alexis Vincent17:04:38

also needs to place nice in a go block

Alexis Vincent17:04:52

Will let you know what I come up with

Alexis Vincent17:04:26

Although count this as a vote to add some super secret way to dynamise MAX-QUEUE

Alexis Vincent17:04:50

No one will ever know

hiredman17:04:11

https://gist.github.com/hiredman/1788aa052f26d127c00a1679656026f0 is an example of extending ReadPort to CompletionStage (the the completable future stuff in the jvm), which lets you use <! and alt! on a CompletableFuture, which doesn't have the channel limit, and you can do something similar in cljs with js promises

hiredman17:04:22

(I had the cljs version somewhere, but seemed to have lost it)

Alexis Vincent19:04:59

@hiredman This is cool, thanks!

Alexis Vincent23:04:34

(defn big-promise []
  (let [p (a/promise-chan)
        c (a/chan)
        m (a/mult c)]

    (a/go
      (let [v (a/<! p)]
        (when v
          (a/>! c v))
        (a/close! c)))

    (reify
      impl/ReadPort
      (take! [this handler]
          (a/go
            ;; not thread safe but... yolo
            (let [v (or (a/poll! p)
                        (let [c' (a/chan)]
                          (a/tap m c')
                          (a/<! c')))]

              #?(:clj
                 (.lock handler))
              (let [good (and (impl/active? handler)
                              (impl/commit handler))]
                #?(:clj
                   (.unlock handler))
                (when good
                  (dispatch/run #(good v))))))
        nil)

      impl/WritePort
      (put! [this v handler]
        (impl/put! p v handler))

      impl/Channel
      (close! [this]
        (impl/close! p))
      (closed? [this]
        (impl/closed? p)))))

Alexis Vincent23:04:12

@hiredman thanks for the gist!

hiredman23:04:30

I strongly recommend you not do that, and instead go with the extension of ReadPort to CompletionStage, and if you really must extend WritePort to it as well, that will give you something that is kind of good and threadsafe instead of whatever this is