I have a hard time implementing this idea: I am using missionary flows to process data by relying on external services. This services both send me data in and I can take actions which will effect the stream of data coming in. So I now have a lot of protocols that basically give me missionary flows and expose side-effecting functions. My running system works. I now want to mock this services, and this is not so easy since services accumulate state over time. So what I want to do is to have an input of events. In a simple form this is (m/seed [:a :c :d :e :f]) What I now want to do for each of this items in the seed I want to run (fn processing [event batched-service-flow]) So the processing function will get the event (example :a) and also the events of the service-flow that have been emitted until the last time the function was run. Lets say :c results in making action calls that the services return [:C] and :e leads to [:E1 :E2]. The input that the processing function should get is [:a []] [:c []] [:d [:C]] [:e []] [:f [:E1 :E2]] [nil []]. Note that :c triggers :C but this is only available in the next batch; same for :e. My idea is to have the same interface for the mock as I have for the running system; so my idea is that the mock services will also expose flows and also allow me to run actions on them. To get the batched-service flow, it somehow seems to work to use m/group-by for this. I thought I can use missionary rendevous to simulate to run an action in a service. After playing around for days with it, I still could not get it done. This is what I have so far. https://github.com/clojure-quant/missionary-test/blob/main/src/demo/batch_v6.clj I have managed to write a processor-fn and a service-fn. But when running it all together, I think I run into deadlocks somewhere. I am not sure if what I am trying to do is smart, of outright stupid. Any comments welcome!
you have an indirect call to m/?, this is currently UB (it's a common pitfall, it should be an error)
ap[create-action-flow] -> process-job -> action! -> ?
Yes. Action! Needs to wait until the rendevous call returns.
@leonoel I managed to get it going when I was only running the group-by operation on m/seed generated data. Do you say that it is not possible to group a flow when it is generated by an ap process that has m/? or m/?> inside?
I have the following problem: A flow only gets active, when consumed. When a flow is produced by something that has state, then I cannot simply stop consuming a flow and resuming later. I am not quite clear why in the example you came up with a week ago it was working ok, and in a more complex scenario it does not.
@leonoel Is what you are saying is that it would be all ok, if the m/? is INSIDE the ap block? So what you say is that m/? is fine to use inside the main thread only. And if there is an AP, then all use of m/? needs to happen either inside a m/sp or m/ap.
BTW: I think you should add an m/await function. And it can be just an alias for m/sp. But m/await is allowed outside a m/sp or m/ap. And m/? is only allowed inside. I am pretty sure that one can write a linter config, that checks if m/? is being used outside and bring an error. Adding this alias, somehow would make this possible. Perhaps @borkdude who wrote clj-kondo knows an answer?
https://github.com/clojure-quant/missionary-test/blob/main/.clj-kondo/hooks/mawait.clj
https://github.com/clojure-quant/missionary-test/blob/main/.clj-kondo/config.edn
https://github.com/clojure-quant/missionary-test/blob/main/deps.edn
@leonoel Luckily clj-kondo in September had a new version that allows this to be done. I copied the links how I did it. Most likely this config needs to be improved.
./src/demo/batch_v6.clj:36:3: error: m/? is allowed only inside m/sp or m/ap
clj-kondo did find the error!
@leonoel I think you could add a default clj-kondo config for missionary. I this case anyone who uses missionary would get the same linting warnings or errors, without having to write a specific custom config.
@leonoel Many thanks for your help in any case! You helped me a lot!