This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2020-10-27
Channels
- # announcements (11)
- # aws (1)
- # babashka (15)
- # beginners (345)
- # calva (5)
- # chlorine-clover (14)
- # clj-kondo (40)
- # cljfx (30)
- # clojure (174)
- # clojure-australia (6)
- # clojure-europe (64)
- # clojure-france (1)
- # clojure-nl (12)
- # clojure-uk (20)
- # clojurescript (29)
- # conjure (1)
- # core-async (53)
- # cryogen (3)
- # cursive (8)
- # datomic (12)
- # emacs (5)
- # events (4)
- # fulcro (27)
- # graalvm (1)
- # graphql (9)
- # helix (8)
- # keechma (11)
- # london-clojurians (1)
- # malli (12)
- # off-topic (12)
- # pathom (8)
- # portal (1)
- # re-frame (19)
- # reagent (13)
- # reitit (16)
- # ring-swagger (2)
- # sci (38)
- # shadow-cljs (31)
- # spacemacs (3)
- # specter (14)
- # sql (14)
- # tools-deps (31)
- # vim (4)
- # xtdb (9)
so, I've started using this:
(defn full? [c]
(.full? (.buf c)))
(defn msgs-on-chan [c]
(.count (.buf c)))
;; Thank you Dan Compton
(defn chan-status [chan-map chan-keys]
(into {}
(map (fn [[k c]]
[k {:full? (full? c)
:backlog (msgs-on-chan c)}]))
(select-keys chan-map chan-keys)))
to just get an idea if my channels are stalled (it has already found one bug for me where I tapped a channel on a mult but didn't put anything on to consume from that channel
I see that I shouldn't use this kind of thing in production, but it has been pretty good for seeing what I might have done wrong in development. It is also giving me a clue about which channels might be running slowly and how to size things. Is there a better way of doing this?
I'm trying to 1. See if things are actually flowing through my channels 2. See if I've got my channels appropriately sized
You can also implement your own buffer (based on the built-in ones) and add logging when it's getting full.
@otfrom logic that uses full?
or msgs-on-chan
can have race conditions that core.async would usually make impossible
this is why these sorts of queries don't exist in the lib itself
but I see you are just using it for diagnostics, not application logic
that's right. I had 4 channels hanging off a mult and I forgot to put the consuming process on one of them. The other 3 drained and the last one was full
one could even make a metrics dashboard for core.async channels, like the ones used in big distsys frameworks :D
something like
(deftype MeteredChannel [take-meter put-meter ch]
impl/Channel
(close! [_]
(impl/close! ch))
(closed? [_]
(impl/closed? ch))
impl/ReadPort
(take! [_ fn1]
(take-meter)
(impl/take! ch fn1))
impl/WritePort
(put! [_ val fn1]
(put-meter)
(impl/put! ch val fn1)))
might be useful for that kind of thing, and less prone to abuse, you would look for discrepancies in the rate of calls to take-meter and put-meterfull? and msgs-on-chan above bother directly examine the buffer, which is a mutable not-threadsafe collection (the channel uses locks when mutating it)
hey, i’m converting an async tree walker from futures to core async, and a couple of things seem to be not quite right with how I’m going about it. • using core async recursively isn’t quite working out - it stalls at a certain depth. I’m guessing this is because it’s blocking until a thread becomes available..? • when I do block core async like this, how can i reset the thread executor to start again?
https://github.com/clj-pour/pour/pull/14/files?w=1 PR I’m working on, for reference
so, recursion where the parent blocks waiting for the child to complete is a bad idea? thought it might be, but was wondering about other ways to go about it
the only way to start again, if CA does get into a fully blocked state, is to restart the repl..?
i’m using this as an opportunity to learn more about ca, making many mistakes like this along the way
there might be some nonsense you can do, that could unstick things, but very likely will just end up in a wilder more unknown state
you cannot do any real blocking operations in the call graph of a go block, only >! <! and the flavors of alts!
if you are using datomic, I don't know, but my guess would be the datomic async api is also using core.async, and the sync api uses the async api with a real blocking <!! at the "root"
so you could be filling the threadpool with tasks that are blocked on some datomic operation that are being it in the threadpool
I am using datomic yep, and this is a kind of experiment in using a custom pull
-like approach where keywords can be given behaviour and weaved into the results
seems like core async is punishing me a bit for not defining a bit more precisely what it should do
the thing about measuring the buffer usage instead of the flow of messages through the channel, is it doesn't tell you anything useful for unbuffered channels, or channels with dropping or sliding buffers
if interested, figured it out - i think. I was using <!
inside reduce, map, keep etc - I turned this around to get a collection of channels instead of a collection of values, and all was ok 🙂
combing sequence processing with core.async is tricky because map, reduce, etc and it is an easy mistake to make to introduce blocking in that function
I was taking inside a keep
for example, and that complained about <! not being inside a go block too