I'm trying out core.async.flow. For part of the flow, I have a compute workload where each call can be run in parallel. With regular core.async, you could use pipeline. For my use case, it's not actually necessary for the outputs to be forwarded in the same order as inputs. How would you set this up using core.async.flow?
I ended up with something similar to https://gist.github.com/phronmophobic/486372516e844684f18a51f856bfa8c2
My initial thought was to create n duplicate processes for this part of the flow. However, that has several downsides: • It seems like I'm deciding on the level of parallelism in the wrong part of the code • Balancing the workload seems tricky. If one worker gets a few longer jobs, I don't want its queue start growing.
I guess another option would be create a single process that can accept the parallelism as a parameter. This process would provide its own ProcLauncher. The process launcher would submit jobs to the :compute executor service provided by the resolver with the desired parallelism.
https://clojurians.slack.com/archives/C05423W6H/p1746639626781339
If I want results as soon as they are available and don't care about preserving order, how would I do that?
Another issue I see with the flow/futurize approach is that it ignores :mixed-exec/:io-exec/:compute-exec config parameters passed to create-flow. Processes that use flow/futurize inside of a flow that was created with custom ExecutorServices via :mixed-exec/:io-exec/:compute-exec will use the "wrong" ExecutorService.
flow/create-flow allows you to provide ExecutorServices for the various workload types. It looks like the https://github.com/clojure/core.async/blob/28f3afcc31e248a11ed7914f61fb3ad65f91f9c5/src/main/clojure/clojure/core/async/flow/impl.clj#L195C13-L195C21 directly uses the default library ExecutorService in inject via futurize:
(futurize do-io {:exec :io})
Should we expect that some flow management might happen outside of the executor services passed to create-flow?