@ghaskins are we both just missing core.async/map?
Unless I am missing something, I don’t think so. I think I am explaining my problem poorly. I have various scenarios with the same dynamic, but one way to explain it would be to consider merging two streams in a Cartesian product style. An example would be in a two stage sharded db query. Stage 1 queries a primary result set, and stage 2 queries a secondary set based on the primary results and a (range shards). I can do N:1 with async/reduce, or N:N with async/map. But so far the only thing I have found which lets me do N:M is to (abuse) pipeline-async
Ultimately I feed the stream of shard-augmented results to another pipeline stage that parallelizes the final db query
(And, importantly, caps the maximum load I place on the db)
So, if my primary result set had [:foo :bar :baz] and there were three shards, I’d ultimately have a stream with 9 messages, [:foo 1] [:foo 2] [:foo 3] [:bar 1] …
So my N:M expansion is 3:9 in that case. I hadn’t found anything that lets me change the shape of the stream like that short of pipeline-async (or custom go routines)
But I feel like I may be missing something
Ultimately everything works, I was just curious to learn some more idiomatic approach, should one exist
A transducer, eg. mapcat can output any number of values per one input value and can be hooked up to a channel
That’s helpful @jkr.sw . I’ll check that out
@jkr.sw at least logically, that appears to be exactly what I needed. Thank you. I am curious about one thing: will introducing mapcat as a transducer in the stream retain the “expand on the fly” behavior I have now, or will it buffer the result until the upstream channel closes?
My resultset and shard ranges can be large, so I need to be able to start the secondary query while the primary is still streaming in
I can always test the above dynamic, I’m just curious on your thoughts
No, the upstream channel can stay open, the transducer itself doesn't buffer anything
Ok, perfect! This was extremely helpful. Thanks again
No problem 🙂