core-async

James Amberger 2024-06-15T14:45:42.043259Z

@ghaskins are we both just missing core.async/map?

ghaskins 2024-06-15T14:52:16.283149Z

Unless I am missing something, I don’t think so. I think I am explaining my problem poorly. I have various scenarios with the same dynamic, but one way to explain it would be to consider merging two streams in a Cartesian product style. An example would be in a two stage sharded db query. Stage 1 queries a primary result set, and stage 2 queries a secondary set based on the primary results and a (range shards). I can do N:1 with async/reduce, or N:N with async/map. But so far the only thing I have found which lets me do N:M is to (abuse) pipeline-async

ghaskins 2024-06-15T14:53:24.698659Z

Ultimately I feed the stream of shard-augmented results to another pipeline stage that parallelizes the final db query

ghaskins 2024-06-15T14:53:49.976079Z

(And, importantly, caps the maximum load I place on the db)

ghaskins 2024-06-15T14:55:52.567739Z

So, if my primary result set had [:foo :bar :baz] and there were three shards, I’d ultimately have a stream with 9 messages, [:foo 1] [:foo 2] [:foo 3] [:bar 1] …

ghaskins 2024-06-15T14:58:06.364669Z

So my N:M expansion is 3:9 in that case. I hadn’t found anything that lets me change the shape of the stream like that short of pipeline-async (or custom go routines)

ghaskins 2024-06-15T14:58:22.749279Z

But I feel like I may be missing something

ghaskins 2024-06-15T15:01:16.340869Z

Ultimately everything works, I was just curious to learn some more idiomatic approach, should one exist

Jan K 2024-06-15T15:02:47.403269Z

A transducer, eg. mapcat can output any number of values per one input value and can be hooked up to a channel

ghaskins 2024-06-15T15:03:28.010459Z

That’s helpful @jkr.sw . I’ll check that out

ghaskins 2024-06-15T15:18:37.629909Z

@jkr.sw at least logically, that appears to be exactly what I needed. Thank you. I am curious about one thing: will introducing mapcat as a transducer in the stream retain the “expand on the fly” behavior I have now, or will it buffer the result until the upstream channel closes?

ghaskins 2024-06-15T15:19:53.849469Z

My resultset and shard ranges can be large, so I need to be able to start the secondary query while the primary is still streaming in

ghaskins 2024-06-15T15:20:36.650939Z

I can always test the above dynamic, I’m just curious on your thoughts

Jan K 2024-06-15T15:20:42.379969Z

No, the upstream channel can stay open, the transducer itself doesn't buffer anything

ghaskins 2024-06-15T15:21:00.525789Z

Ok, perfect! This was extremely helpful. Thanks again

Jan K 2024-06-15T15:21:10.362789Z

No problem 🙂