Fork me on GitHub
#core-async
<
2019-09-06
>
Joshua Suskalo00:09:00

I've been working with core.async for a decent amount of time, but mostly with a small subset consisting of parking put and take, basic channels with sized buffers, and the thread macro. At the moment I'm making a more complex set of processes which need to interact with each other, and as a result I've found need of mult. It seems like sometimes my messages are getting lost on the mult though. Are there any common pitfalls with mults which would cause this?

hiredman01:09:40

There are a number of open mult issues, I think they don't lead to dropped messages, but I am not sure

Joshua Suskalo01:09:24

Alright. Well I guess in this case I can just keep a vector of open channels since that will allow individual communication as well as broadcast communication.

jjttjj01:09:23

You're not taking from the source Chan used to make the mult right?

Joshua Suskalo01:09:58

No, I don't believe so.

hiredman01:09:14

I think the open issues are mostly about cleaning up on untap and closing channels

Joshua Suskalo01:09:44

Fair enough. And also can confirm I'm not pulling from the channel used to make the mult.

jjttjj16:09:18

just checking that I'm not missing something: If I have a channel, is there some way to replicate "attaching a callback" to the closing of that chan, and then passing the chan along, without wrapping take!/`<!!`?

jjttjj16:09:13

I have a function that creates a chan that is designed to close, and I want it to return the chan, but there is also some cleanup stuff that needs to be done when the chan closes

hiredman17:09:49

There is no way to attach anything to respond to the closing of a channel. I often just use another channel to signal that kind of thing

jjttjj17:09:27

gotcha, thanks

jjttjj17:09:34

I guess I could use a mult and just take everything from a tapped chan and wait for it to return nil

markmarkmark17:09:57

maybe you could make something like pipe that takes a callback. just loop over the results and if you get null from the source channel, do the callback and close the sink channel

markmarkmark17:09:13

and of course, pass non-nil things onto the sink

jjttjj17:09:35

oh yeah that's probably better

markmarkmark17:09:39

of course, if the sink is being taken from slowly, then there's the possibility that the source channel will be closed for some time before the pipe would see that the source is closed

markmarkmark17:09:49

so it depends on when exactly the callback needs to be called

jjttjj17:09:51

nah this should be perfect for my use case

jjttjj21:09:14

This is an example of what I cam up with: https://gist.github.com/jjttjj/854e725c2ae1e0a1f3b5a4b235252df3 I'm wondering if there's an obvious way to get around the fact that I have to either use a predefined out-ch in put-msgs! there, or pass the output channel as a parameter and have to mange 2 channels everywhere? I beleive the way it currently is, with out-ch as just a channel with a preset buffer won't work in situations where that buffer isn't appropriate

hiredman21:09:10

looks vaguely like maybe a custom implementation of the Pub protocol for pub/sub

hiredman21:09:29

it is hard to tell what is going on there, I think maybe the intent is for the process and handler stuff to simulate so external queue or pubsub system?

hiredman21:09:13

it is weird that put-msgs! removes a single handler, but stops the loop that would be feeding any other handlers

jjttjj21:09:32

oh yeah you're right it shouldn't stop the loop there

jjttjj21:09:29

basically i have an incoming stream of messages and i basically want to be able to call a function that puts a copy of each message into a chan so that I can subject it to a transducer

jjttjj21:09:16

and sometimes there will be a point that makes sense for these messages to end, which is expressed via the transducer

jjttjj21:09:09

I do think a mult would work, just pass all messages to a mult and tap the new transducer-channels as needed. But I was sort of trying to avoid having to keep the mult somewhere and tapping it. (Though perhaps that's not any worse than just calling add handlers on the connection anyway)

hiredman21:09:23

why not have whatever is consuming result-chan trigger the clean up?

jjttjj21:09:00

because then i'd have to keep the handler value h around so it could be removed, though I suppose there are design choices around this as well

hiredman21:09:12

(let [c (chan xf)
      _ (give-me-messages whatever c)
      messages (async/<! (async/into [] c))]
  (stop-giving-me-messages whatever c)
  messages)

jjttjj21:09:15

hmmmm my thinking was that sometimes the processes are longer running and won't actually be put into [] and will be handled more asynchronously

jjttjj21:09:36

some of the transducer-channels will end up in a reduced state but sometimes they will just be closed later on "manually" and the messages handled as they come in, but either way I want to remove the handler on the closing of the chan

jjttjj21:09:24

and my thinking was it'd be convenient to do that at the same place where it's attached and not have to worry about it

hiredman21:09:44

basically extending mult's handling of closed downstream channels to whatever your source of information is

hiredman21:09:10

maybe implement async/Mult for your source

jjttjj21:09:42

interesting

hiredman21:09:15

or Pub, depending on if your handler stuff is meant to simulate different subscriptions or not

hiredman21:09:22

or, if you want to be really terrible(and if you are doing something pubsubish), create a standard async/pub, then use reflection to get access to the atom inside it, and add a watch to that atom to be notified when the topics subscribed to change

jjttjj22:09:14

haha wow. I think I'll try the mult thing first