Fork me on GitHub
#core-async
<
2019-11-28
>
hiredman00:11:39

if you have an use of try/catch/finally in your real code you might try without that, there are some bugs in how the go macro compiles that which can lead to weirdness

jjttjj00:11:24

I don't believe so

jjttjj00:11:01

Oh. It does take place in an add-watch fn.

😬 4
jjttjj00:11:17

will try to add that to the repro. code

hiredman00:11:23

I bet if you go through, and put printlns everywhere, you will find two printlns that happen in a different order then you expect

hiredman00:11:33

and it'll be something like the tap isn't happening until after all the go loops have published their values to a mult where no body is listening

jjttjj00:11:22

(def ch1 (a/chan 1))
(def m (a/mult ch1))

(def aa (atom nil))

(add-watch aa ::aa
           (fn [k r o n]
             (let [ch2 (a/tap m (a/chan 1))]
               (log/debug "watch changed")
               (a/go
                 (when-let [x (a/<! ch2)]
                   (println "GOT2" x)
                   ;;(a/close! ch2)
                   )))))

(doseq [x (range 10)]
  (a/put! ch1 x)
  ;;(a/go (a/>! ch1 x))
  )


(def but
  (d/div
   (d/button {:onclick #(a/put! ch1 (rand))} "put")
   (d/button {:onclick #(reset! aa 2)} "reset")))
So this is closer to the the real thing, and now the go loop never prints, with or without the close!

jjttjj00:11:47

(i realize now the watch function is dumb, and i could stream the changes as another channel but should this work anyway?)

hiredman00:11:02

all the puts flow through the mult and go no where because there is no one listening

hiredman00:11:02

and then later you click the button, it resets the atom, which creates a tap, but by that point there is nothing there

hiredman00:11:33

by all the puts I mean the a/put! in the doseq

jjttjj00:11:39

ah, i think that's still a case of a bad repro. sorry. The real thing creates the tap, then makes a request that should result in the puts, then does the go block

hiredman00:11:19

it should result in puts, but does it?

jjttjj00:11:46

(def ch1 (a/chan 1))
(def m (a/mult ch1))

(def aa (atom nil))

(defn do-puts! []
  (doseq [x (range 10)]
    (a/put! ch1 x)
    ;;(a/go (a/>! ch1 x))
    ))

(add-watch aa ::aa
           (fn [k r o n]
             (let [ch2 (a/tap m (a/chan 1))]
               (do-puts!)
               (log/debug "watch changed")
               (a/go
                 (when-let [x (a/<! ch2)]
                   (println "GOT2" x)
                   ;;(a/close! ch2)
                   )))))




(def but
  (d/div
   (d/button {:onclick #(a/put! ch1 (rand))} "put")
   (d/button {:onclick #(reset! aa 2)} "reset")))

jjttjj00:11:39

and now it's back to working with and without the close!

jjttjj00:11:01

(the go block print executes on reset of the atom)

hiredman00:11:20

you are getting at least one println out

hiredman00:11:58

and what is happening is the mult is getting clogged up because you never untap after your watch is done

hiredman00:11:11

the close is effectively an untap

hiredman00:11:57

in your real world case you can replace the a/close with a/untap, see that it makes it work, and confirm that this is correct

jjttjj00:11:08

oh yeah, sure

hiredman00:11:32

at which means the whole watch things happens at least once, so you should see at least one println, and if you don't, you can start trying to figure out why

jjttjj00:11:38

ok this is confusing. The untap works, but doesn't seem to actually untap anything judging from libral printlns

jjttjj00:11:57

i mean the print happens with the untap, same as no close! or untap.

jjttjj00:11:27

but the tapped chans are still receiving values after the untap

hiredman00:11:03

just for the sake of it, try untap-all

jjttjj00:11:26

untap-all works as expected!

hiredman00:11:33

I suspect somewhere, somehow you do have a tap, where the channel being tapped to is not being consumed from

hiredman00:11:46

which will block the mult

jjttjj00:11:47

i can't seem to find one at the moment (which doesn't mean it doesn't exist).

jjttjj00:11:10

gotta run out for now, probably best to step away from this for a bit anyway, but thanks for all the help/patience!

jjttjj16:11:20

I finally tracked this down! I was basically using code inspired by/equivalent to this: https://github.com/capitalone/cqrs-manager-for-distributed-reactive-services/blob/master/src/com/capitalone/commander/api.clj#L288-L292 This provides a way to get a copy of a single source chan, but with its events transformed (by piping through a chan with a transducer). The transformation chan that's actually tapped is closed over and thus unable to be closed/untapped

jjttjj16:11:09

Out of curiosity, this code is probably inherently wrong/bad right? the transformation (ie (map command-map) transducer above) should be done before the mult is created right?

jjttjj16:11:16

Putting a sliding/dropping buffer on the int chan gets the job done also but it still doesn't allow for untapping

jjttjj17:11:56

What is the ideal usage of mult anyway? I have a clojurescript app and have a single channel that serves as the point of entry for all messages from a server. I have to distribute those messages to various ui components. I've tended to approach this by making a mult wrapper for that channel at the top level, and creating helper functions of that mult such as the commander code above, and call these from the ui components. Is it better to just use the main channel instead and have each component create it's own mult off that if needed?

jjttjj17:11:39

Actually I guess the mult per component wouldn't work because then nothing could take from the main incoming channel, multiple mults would race to take messages, etc

Jan K17:11:13

Sounds like pub/sub (which is implemented using mults) would be more suitable for that

jjttjj18:11:16

if I understand the doc string correctly, a pub wont' globally get "stuck" on one held up sub on any topic, but rather each sub is it's own mult and can only hold up other sub'ed chans with the same topic right

Jan K18:11:29

With the defaults a pub can actually get stuck globally if there's a slow subscriber, since the mults are attached to unbuffered channels. You could pass a buf-fn to the pub to avoid it (drop messages instead of blocking), or just make sure to subscribe to non-blocking channels.