missionary

awb99 2024-10-08T18:37:27.044189Z

I believe I found a usecase where missionary's operators for streams are not sufficient.

awb99 2024-10-08T18:39:28.194219Z

I did put the example here: https://github.com/clojure-quant/missionary-test/blob/main/src/demo/batch_combiner.clj

awb99 2024-10-08T18:40:49.782069Z

So the usecase is to read from multiple discrete flows, and process them in batches over time.

awb99 2024-10-08T18:44:43.582159Z

I can use relieve, to basically aggregate all new values that have been made available up to now.

awb99 2024-10-08T18:44:47.237059Z

This works.

awb99 2024-10-08T18:45:51.132759Z

When I use m/sample to read from this batched stream, then I need to feed it essentially continuous stream, so I will ineviteably get duplicated values of the streams that I want to process in a batch.

awb99 2024-10-08T18:47:26.769109Z

The operator that I believe is missing could be m/flow-or which would return either return the value from the flow, or a fixed value.

awb99 2024-10-08T18:53:44.506279Z

I believe in an ap process neither ?> nor ?< would work for this, because they depend on getting a value. So that does not work.

awb99 2024-10-08T18:55:52.858809Z

Or it might be instead of m/flow-or perhaps m/batch should be added that would work similar to m/sample but would be designed to work with discrete processes, and could do it.

awb99 2024-10-08T18:56:34.412869Z

I hope I am not talking nonsense here... I took 2 days to think about this, and was reading the docs, the cookbook and missionary code over and over again.

awb99 2024-10-08T18:56:37.994789Z

Thanks!

Hendrik 2024-10-15T07:28:17.980419Z

Since nobody has answered yet, I will try to answer. I think that it is possible with the current feature set of missionary. m/relieve can work, but you have to cancel the flow after a batch has completed and create a new flow for the next batch. Otherwise you get the behaviour that you have observed. m/relieve returns a continous flow, that means it will always return the latest value it has seen, when it is asked to produce a value. I think, that you have to use m/ap to achieve what you want. Maybe take a closer look on throttle and debounce. I think, that this will guide you in the right direction: https://github.com/leonoel/missionary/wiki/Debounce-and-Throttle#debounce Unfortunally, I have no time to create and test any solution by myself right now.