Fork me on GitHub
Joshua Suskalo00:09:00

I've been working with core.async for a decent amount of time, but mostly with a small subset consisting of parking put and take, basic channels with sized buffers, and the thread macro. At the moment I'm making a more complex set of processes which need to interact with each other, and as a result I've found need of mult. It seems like sometimes my messages are getting lost on the mult though. Are there any common pitfalls with mults which would cause this?


There are a number of open mult issues, I think they don't lead to dropped messages, but I am not sure

Joshua Suskalo01:09:24

Alright. Well I guess in this case I can just keep a vector of open channels since that will allow individual communication as well as broadcast communication.


You're not taking from the source Chan used to make the mult right?

Joshua Suskalo01:09:58

No, I don't believe so.


I think the open issues are mostly about cleaning up on untap and closing channels

Joshua Suskalo01:09:44

Fair enough. And also can confirm I'm not pulling from the channel used to make the mult.


just checking that I'm not missing something: If I have a channel, is there some way to replicate "attaching a callback" to the closing of that chan, and then passing the chan along, without wrapping take!/`<!!`?


I have a function that creates a chan that is designed to close, and I want it to return the chan, but there is also some cleanup stuff that needs to be done when the chan closes


There is no way to attach anything to respond to the closing of a channel. I often just use another channel to signal that kind of thing


gotcha, thanks


I guess I could use a mult and just take everything from a tapped chan and wait for it to return nil


maybe you could make something like pipe that takes a callback. just loop over the results and if you get null from the source channel, do the callback and close the sink channel


and of course, pass non-nil things onto the sink


oh yeah that's probably better


of course, if the sink is being taken from slowly, then there's the possibility that the source channel will be closed for some time before the pipe would see that the source is closed


so it depends on when exactly the callback needs to be called


nah this should be perfect for my use case


This is an example of what I cam up with: I'm wondering if there's an obvious way to get around the fact that I have to either use a predefined out-ch in put-msgs! there, or pass the output channel as a parameter and have to mange 2 channels everywhere? I beleive the way it currently is, with out-ch as just a channel with a preset buffer won't work in situations where that buffer isn't appropriate


looks vaguely like maybe a custom implementation of the Pub protocol for pub/sub


it is hard to tell what is going on there, I think maybe the intent is for the process and handler stuff to simulate so external queue or pubsub system?


it is weird that put-msgs! removes a single handler, but stops the loop that would be feeding any other handlers


oh yeah you're right it shouldn't stop the loop there


basically i have an incoming stream of messages and i basically want to be able to call a function that puts a copy of each message into a chan so that I can subject it to a transducer


and sometimes there will be a point that makes sense for these messages to end, which is expressed via the transducer


I do think a mult would work, just pass all messages to a mult and tap the new transducer-channels as needed. But I was sort of trying to avoid having to keep the mult somewhere and tapping it. (Though perhaps that's not any worse than just calling add handlers on the connection anyway)


why not have whatever is consuming result-chan trigger the clean up?


because then i'd have to keep the handler value h around so it could be removed, though I suppose there are design choices around this as well


(let [c (chan xf)
      _ (give-me-messages whatever c)
      messages (async/<! (async/into [] c))]
  (stop-giving-me-messages whatever c)


hmmmm my thinking was that sometimes the processes are longer running and won't actually be put into [] and will be handled more asynchronously


some of the transducer-channels will end up in a reduced state but sometimes they will just be closed later on "manually" and the messages handled as they come in, but either way I want to remove the handler on the closing of the chan


and my thinking was it'd be convenient to do that at the same place where it's attached and not have to worry about it


basically extending mult's handling of closed downstream channels to whatever your source of information is


maybe implement async/Mult for your source




or Pub, depending on if your handler stuff is meant to simulate different subscriptions or not


or, if you want to be really terrible(and if you are doing something pubsubish), create a standard async/pub, then use reflection to get access to the atom inside it, and add a watch to that atom to be notified when the topics subscribed to change


haha wow. I think I'll try the mult thing first