Is there a way to clear a channel while maintaining all publications and subscribers? Here’s my use case:
I’m building phone AI agents. An LLM generates voice outputs that I put on a channel.
In a different go-loop, I take data from this channel and send it to the user as a playback. If the user says something (interrupting the AI speech), all the remaining LLM events from the AI Output channel must be cleared before new ones can be added.
Currently, I cheat, and I keep my channel in an atom, and when an interruption happens, I reset my atom to a new channel, and the LLM generation just uses
(>! @ai-channel event)
Ideally, I want to keep the same channel so I can add publications and subscribers to do custom async work on it, which doesn’t seem like it can work with resetting the channel.
Any ideas? I’d want a (clear! ai-channel) that supersedes any (<! ai-channel)
To simplify, in a pipeline, when an event happens, interrupt the pipeline, clear all pending items from the pipeline and then resume. What is the best way to do this with core.async?
not 100% I'm visualizing what you mean correctly, but maybe something like this?
(defn clear [ch]
(while (some? (a/poll! ch))))
(a/go-loop []
(let [event (a/! my-pub-input-ch))))Maybe a channel of channels? Start by taking a channel from the channel of channels, then loop over an alt of the taken channel and the channel of channels
Output goes into a channel that is placed on the channel of channels, and you interrupt by putting a new channel on the channel of channels
I think a channel of channels might work, actually. I don’t have a clear image of how to use a/pub & a/sub on this, but it sounds very promising. Thank you