Fork me on GitHub
#core-async
<
2021-08-09
>
nha09:08:37

Hello allย ๐Ÿ™‚ I am seeing this in the logs of one of my apps:

java.lang.AssertionError: Assert failed: No more than 1024 pending takes are allowed on a single channel.
 (< (.size takes) impl/MAX-QUEUE-SIZE)" exceptionType=java.lang.AssertionError
This seems the likely culprit:
(defn consume-channels [channels result-chan]
  (doseq [out channels] 
    (async/thread
      (while
          (if-let [msg (async/<!! out)]
            (do (async/>!! res-ch (do-something msg))
                true)
            false)))))
This is supposed to consume messages from a list of channels until the channels are closed. do-something is doing some io and takes ~10 seconds. How should this be rewritten to avoid the above error?

Ben Sless10:08:10

separate consuming from channels and processing first, i.e., async/merge Then do your IO in pipeline blocking downstream Second, I doubt this is the culprit. Why? The assertion tells you you have enqueued too many takes on one channel. In your function you start a new thread per channel and only one thread is taking from the channel, so per thread there is no more than one enqueued take on the channel

Ben Sless10:08:59

The problem is probably somewhere where you call take! , <! or <!! many times on one channel

nha10:08:38

I see, that makes sense, I will need to dig down where other take are done then. Thanks a lot

Ben Sless10:08:13

Welcome. do you have a call stack? maybe it will help

nha10:08:58

Yes, it points to the underlying throttled channel (https://github.com/brunoV/throttler) But this is only used in another place that I can see, which periodically checks that the channels can be created (a healthcheck)

nha10:08:45

I should have mentioned, the input argument "channels" are throttled channels.

Ben Sless10:08:30

If you can paste the entire error stack trace

nha11:08:13

of course:

2021-08-09T11:15:59.761+0100 ERROR Uncaught exception on async-dispatch-8571
java.lang.AssertionError: Assert failed: No more than 1024 pending takes are allowed on a single channel.
(< (.size takes) impl/MAX-QUEUE-SIZE)
	at clojure.core.async.impl.channels.ManyToManyChannel.take_BANG_(channels.clj:235)
	at clojure.core.async.impl.ioc_macros$take_BANG_.invokeStatic(ioc_macros.clj:988)
	at clojure.core.async.impl.ioc_macros$take_BANG_.invoke(ioc_macros.clj:987)
	at throttler.core$chan_throttler_STAR_$fn__32201$state_machine__11344__auto____32212$fn__32214.invoke(core.clj:38)
	at throttler.core$chan_throttler_STAR_$fn__32201$state_machine__11344__auto____32212.invoke(core.clj:35)
	at clojure.core.async.impl.ioc_macros$run_state_machine.invokeStatic(ioc_macros.clj:978)
	at clojure.core.async.impl.ioc_macros$run_state_machine.invoke(ioc_macros.clj:977)
	at clojure.core.async.impl.ioc_macros$run_state_machine_wrapped.invokeStatic(ioc_macros.clj:982)
	at clojure.core.async.impl.ioc_macros$run_state_machine_wrapped.invoke(ioc_macros.clj:980)
	at clojure.core.async.impl.ioc_macros$take_BANG_$fn__11362.invoke(ioc_macros.clj:991)
	at clojure.core.async.impl.channels.ManyToManyChannel$fn__6980.invoke(channels.clj:265)
	at clojure.lang.AFn.run(AFn.java:22)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at clojure.core.async.impl.concurrent$counted_thread_factory$reify__6818$fn__6819.invoke(concurrent.clj:29)
	at clojure.lang.AFn.run(AFn.java:22)
	at java.lang.Thread.run(Thread.java:748)
2021-08-09T11:15:59.761+0100 INFO exceptionCaller=throttler.core$chan_throttler_STAR_$fn__32201$state_machine__11344__auto____32212$fn__32214# exceptionCallerClass=throttler.core$chan_throttler_STAR_$fn__32201$state_machine__11344__auto____32212$fn__32214 exceptionCallerLineNo=38 exceptionCallerMethod=invoke exceptionData=null exceptionMessage="Assert failed: No more than 1024 pending takes are allowed on a single channel.

Ben Sless11:08:13

Hard to say where the bug is, exactly, but I think you should open an issue to the library

Ben Sless11:08:28

Did you set a bucket size?

nha11:08:51

yes, though the bucket size is computed and might be too big (> 1024)

nha11:08:06

so I will check that first ๐Ÿ™‚ thanks a lot for you help

nha11:08:30

oh. what is bucketing? I suspect I donโ€™t need it, just need throttling

Ben Sless11:08:07

bucketing is more tolerant throttling, lets you send bursts

nha11:08:09

ah you mean the bucketing algorithm I guess

nha11:08:30

yes I see ๐Ÿ™‚ not sure about this since it is for an external API and the limits there are not clear

Ben Sless11:08:48

They never are ๐Ÿ™‚

nha11:08:52

but Iโ€™ll keep that in mind. Thanks a lot for your help!

nha11:08:54

ahah indeed ๐Ÿ˜›

Ben Sless11:08:07

you can connect it with a circuit breaker if you're not sure

Ben Sless11:08:34

then connect it with diehard to send negative feedback

nha11:08:51

very interesting! ๐Ÿ™‚ I will check more.async

๐Ÿ™ƒ 4
Ben Sless11:08:45

breaker-pipe -> rate-limiter -> (IO + CB) Close over the CB in the function you pass to breaker-pipe, then if you trip it, the breaker will open up stream

nha11:08:07

๐Ÿ™‚ sounds good to me