Another idea that I'm having trouble translating from regular core.async to core.async.flow is back propagating messages from consumers that they no longer require further input from producers.
In core.async, a consumer can take from a channel and then close the channel when they're no longer interested in new messages. Producers can call async/>! and if the return value is false, they know the channel is closed and there's no point in producing any more outputs.
In this particular example, I have a media file which produces packets. From packets you can extract frames. ie. file -> packets -> frames
I'm implementing trimming a media file given an end timestamp. Because of how decoding works, packets may not be in presentation order. You don't actually know if you've reached the last frame for your timestamp range until you've decoded a frame with a timestamp past the end timestamp. The packet and frame decoders are separate flow procs. Ideally, the frame trimming would also be its own proc (eg. file -> packets -> frames -> timestamp trimmer). In core.async, the timestamp trimmer could close its input, which would signal to the frame decoder to stop producing, close its input channel which signals to the packet decoder, etc. There doesn't seem to be an analogous concept in core.async.flow.