This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2021-08-09
Channels
- # announcements (5)
- # aws (5)
- # babashka (7)
- # beginners (152)
- # cider (10)
- # clj-kondo (30)
- # clj-on-windows (1)
- # cljs-dev (14)
- # cljsrn (19)
- # clojure (94)
- # clojure-australia (4)
- # clojure-europe (43)
- # clojure-nl (2)
- # clojure-uk (11)
- # clojurescript (16)
- # clojureverse-ops (5)
- # code-reviews (7)
- # community-development (6)
- # core-async (29)
- # cursive (50)
- # datomic (22)
- # docker (10)
- # figwheel-main (3)
- # fulcro (4)
- # graalvm (1)
- # introduce-yourself (2)
- # kaocha (9)
- # lambdaisland (2)
- # lsp (19)
- # malli (37)
- # off-topic (50)
- # polylith (8)
- # portal (1)
- # reagent (10)
- # rum (1)
- # shadow-cljs (24)
- # spacemacs (14)
- # yada (2)
Hello allย ๐ I am seeing this in the logs of one of my apps:
java.lang.AssertionError: Assert failed: No more than 1024 pending takes are allowed on a single channel.
(< (.size takes) impl/MAX-QUEUE-SIZE)" exceptionType=java.lang.AssertionError
This seems the likely culprit:
(defn consume-channels [channels result-chan]
(doseq [out channels]
(async/thread
(while
(if-let [msg (async/<!! out)]
(do (async/>!! res-ch (do-something msg))
true)
false)))))
This is supposed to consume messages from a list of channels until the channels are closed.
do-something
is doing some io and takes ~10 seconds.
How should this be rewritten to avoid the above error?separate consuming from channels and processing first, i.e., async/merge
Then do your IO in pipeline blocking downstream
Second, I doubt this is the culprit. Why?
The assertion tells you you have enqueued too many takes on one channel. In your function you start a new thread per channel and only one thread is taking from the channel, so per thread there is no more than one enqueued take on the channel
The problem is probably somewhere where you call take!
, <!
or <!!
many times on one channel
I see, that makes sense, I will need to dig down where other take are done then. Thanks a lot
Yes, it points to the underlying throttled channel (https://github.com/brunoV/throttler) But this is only used in another place that I can see, which periodically checks that the channels can be created (a healthcheck)
of course:
2021-08-09T11:15:59.761+0100 ERROR Uncaught exception on async-dispatch-8571
java.lang.AssertionError: Assert failed: No more than 1024 pending takes are allowed on a single channel.
(< (.size takes) impl/MAX-QUEUE-SIZE)
at clojure.core.async.impl.channels.ManyToManyChannel.take_BANG_(channels.clj:235)
at clojure.core.async.impl.ioc_macros$take_BANG_.invokeStatic(ioc_macros.clj:988)
at clojure.core.async.impl.ioc_macros$take_BANG_.invoke(ioc_macros.clj:987)
at throttler.core$chan_throttler_STAR_$fn__32201$state_machine__11344__auto____32212$fn__32214.invoke(core.clj:38)
at throttler.core$chan_throttler_STAR_$fn__32201$state_machine__11344__auto____32212.invoke(core.clj:35)
at clojure.core.async.impl.ioc_macros$run_state_machine.invokeStatic(ioc_macros.clj:978)
at clojure.core.async.impl.ioc_macros$run_state_machine.invoke(ioc_macros.clj:977)
at clojure.core.async.impl.ioc_macros$run_state_machine_wrapped.invokeStatic(ioc_macros.clj:982)
at clojure.core.async.impl.ioc_macros$run_state_machine_wrapped.invoke(ioc_macros.clj:980)
at clojure.core.async.impl.ioc_macros$take_BANG_$fn__11362.invoke(ioc_macros.clj:991)
at clojure.core.async.impl.channels.ManyToManyChannel$fn__6980.invoke(channels.clj:265)
at clojure.lang.AFn.run(AFn.java:22)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at clojure.core.async.impl.concurrent$counted_thread_factory$reify__6818$fn__6819.invoke(concurrent.clj:29)
at clojure.lang.AFn.run(AFn.java:22)
at java.lang.Thread.run(Thread.java:748)
2021-08-09T11:15:59.761+0100 INFO exceptionCaller=throttler.core$chan_throttler_STAR_$fn__32201$state_machine__11344__auto____32212$fn__32214# exceptionCallerClass=throttler.core$chan_throttler_STAR_$fn__32201$state_machine__11344__auto____32212$fn__32214 exceptionCallerLineNo=38 exceptionCallerMethod=invoke exceptionData=null exceptionMessage="Assert failed: No more than 1024 pending takes are allowed on a single channel.
Hard to say where the bug is, exactly, but I think you should open an issue to the library
Look, if token-value
is over 1024, it can break
https://github.com/brunoV/throttler/blob/master/src/throttler/core.clj#L52
Sure If you don't need bucketing just copy my simple rate limiter https://github.com/bsless/more.async/blob/master/src/main/clojure/more/async.clj#L738
yes I see ๐ not sure about this since it is for an external API and the limits there are not clear
https://github.com/bsless/more.async/blob/master/src/main/clojure/more/async.clj#L716