Fork me on GitHub
#core-async
<
2020-10-27
>
otfrom13:10:44

so, I've started using this:

(defn full? [c]
  (.full? (.buf c)))

(defn msgs-on-chan [c]
  (.count (.buf c)))

;; Thank you Dan Compton 
(defn chan-status [chan-map chan-keys]
  (into {}
        (map (fn [[k c]]
               [k {:full? (full? c)
                   :backlog (msgs-on-chan c)}]))
        (select-keys chan-map chan-keys)))

otfrom13:10:18

to just get an idea if my channels are stalled (it has already found one bug for me where I tapped a channel on a mult but didn't put anything on to consume from that channel

otfrom13:10:27

I see that I shouldn't use this kind of thing in production, but it has been pretty good for seeing what I might have done wrong in development. It is also giving me a clue about which channels might be running slowly and how to size things. Is there a better way of doing this?

otfrom13:10:46

I'm trying to 1. See if things are actually flowing through my channels 2. See if I've got my channels appropriately sized

Jan K14:10:03

You can also implement your own buffer (based on the built-in ones) and add logging when it's getting full.

noisesmith16:10:05

@otfrom logic that uses full? or msgs-on-chan can have race conditions that core.async would usually make impossible

noisesmith16:10:17

this is why these sorts of queries don't exist in the lib itself

noisesmith16:10:59

but I see you are just using it for diagnostics, not application logic

otfrom16:10:08

that's right. I had 4 channels hanging off a mult and I forgot to put the consuming process on one of them. The other 3 drained and the last one was full

otfrom16:10:20

It helped me figure it out right away

noisesmith16:10:38

one could even make a metrics dashboard for core.async channels, like the ones used in big distsys frameworks :D

hiredman17:10:08

something like

(deftype MeteredChannel [take-meter put-meter ch]
  impl/Channel
  (close! [_]
    (impl/close! ch))
  (closed? [_]
    (impl/closed? ch))
  impl/ReadPort
  (take! [_ fn1]
    (take-meter)
    (impl/take! ch fn1))
  impl/WritePort
  (put! [_ val fn1]
    (put-meter)
    (impl/put! ch val fn1)))
might be useful for that kind of thing, and less prone to abuse, you would look for discrepancies in the rate of calls to take-meter and put-meter

👍 6
hiredman17:10:35

full? and msgs-on-chan above bother directly examine the buffer, which is a mutable not-threadsafe collection (the channel uses locks when mutating it)

dazld17:10:30

hey, i’m converting an async tree walker from futures to core async, and a couple of things seem to be not quite right with how I’m going about it. • using core async recursively isn’t quite working out - it stalls at a certain depth. I’m guessing this is because it’s blocking until a thread becomes available..? • when I do block core async like this, how can i reset the thread executor to start again?

hiredman17:10:03

don't do blocking things in a go block

dazld17:10:58

so, recursion where the parent blocks waiting for the child to complete is a bad idea? thought it might be, but was wondering about other ways to go about it

hiredman17:10:23

if you are calling a function that contains <!! that is blocking

dazld17:10:14

understood

dazld17:10:40

the only way to start again, if CA does get into a fully blocked state, is to restart the repl..?

dazld17:10:12

i’m using this as an opportunity to learn more about ca, making many mistakes like this along the way

hiredman17:10:27

there might be some nonsense you can do, that could unstick things, but very likely will just end up in a wilder more unknown state

👍 3
hiredman17:10:30

you cannot do any real blocking operations in the call graph of a go block, only >! <! and the flavors of alts!

dazld17:10:36

so, on that PR, I only have (afaict) one blocking call, at the root

hiredman17:10:13

hard to say, I dunno what knit does, and there are some callbacks passed around

hiredman17:10:23

if you are using datomic, I don't know, but my guess would be the datomic async api is also using core.async, and the sync api uses the async api with a real blocking <!! at the "root"

hiredman17:10:18

so you could be filling the threadpool with tasks that are blocked on some datomic operation that are being it in the threadpool

dazld17:10:38

I am using datomic yep, and this is a kind of experiment in using a custom pull -like approach where keywords can be given behaviour and weaved into the results

dazld17:10:54

seems like core async is punishing me a bit for not defining a bit more precisely what it should do

hiredman17:10:14

the thing about measuring the buffer usage instead of the flow of messages through the channel, is it doesn't tell you anything useful for unbuffered channels, or channels with dropping or sliding buffers

dazld18:10:43

if interested, figured it out - i think. I was using <! inside reduce, map, keep etc - I turned this around to get a collection of channels instead of a collection of values, and all was ok 🙂

dazld18:10:28

would be interested in why, though

hiredman18:10:48

you were recursively calling parse which was blocking the thread with <!!

dazld18:10:30

interestingly, not - parse wasn’t using any blocking calls

hiredman18:10:39

parse was blocking

dazld18:10:41

it did at one point, sorry if you saw an older version

dazld18:10:07

I changed it out to return a channel from go-loop

hiredman18:10:24

yes, you made that change since I last looked

dazld18:10:11

sorry, i thought had updated it

hiredman18:10:24

combing sequence processing with core.async is tricky because map, reduce, etc and it is an easy mistake to make to introduce blocking in that function

dazld18:10:36

bingo, that was exactly what i did

dazld18:10:48

have to process into channels

dazld18:10:32

I was taking inside a keep for example, and that complained about <! not being inside a go block too

hiredman18:10:36

you might want to look at using tranducers, either on channels directly, or as part of a pipeline

dazld18:10:13

do you have some examples..?

dazld18:10:27

All good, I’ll look into it, thank you for help

hiredman18:10:00

the go transformation does not cross function boundaries

👍 3
dazld18:10:08

ah hah, good to know

dazld18:10:33

this is quite a bit faster than the future version now too ❤️