Fork me on GitHub

Hi, suppose I have N objects and M operations (some of which are doing network I/O). I want to call the sequence of operations in order for each of the N objects but allowing parallelism (across the objects) where possible. There is one synch (fan-in) point in the pipeline let's say at operation M-1. What's the best/easiest way to do this in core.async? [Also, this is in ClojureScript so thread is not an option).


Check out pipeline and pipeline-async.


Also, you say “call the sequence of operations in order”. You can only guarantee that the operations will begin in order, and that the results be returned in order. If any of the operations are dependent on each other, fan-out/fan-in wouldn’t work.


@jgdavey: I did check out pipeline and pipeline-async and they won't work for this (at least not without composition with other functions [hence the question here]).


Regarding the second point-- isn't 'fan-in' (aka join) exactly what would facilitate observing any dependencies between the stages?


so, let's say I have stages: A B C D E F. I need a global 'sync point'/'fan-in'/'join' at stage D. A B C are free to operate in parallel across the objects but must be executed in order for each individual object (likewise for E and F).


Looks like the easiest answer here isn't core.async at all but rather cats' alet.


What about multiple pipelines with intermediate channels?


Don't have time to think about this deeper yet, but I don't see a reason why you couldn't figure something out by just composing channels and iterating over structures of those channels as needed


if you check the core.async source, that's essentially what most of the more complex built-in functions are doing anyway


if you need to do something in order, just take off a particular channel in order


if something can't happen until something else and must happen in order, loop and block as needed


you can do some things that might help like something similar to waitone/waitall semantics from the .net/windows world....this can be done with callbacks on put!/take! and an atom keeping the count of everyone you're waiting on when you want parallel until x needs to happen