Is it possible to cancel a flow? I have a multiplexer that reads its input flow and then starts new flows like this: (m/amb (m/?> (create-order order-action)) The beauty of this is that I get all outputs of the child flow as outputs of the parent flow. But now I need a way to cancel the child flow. And in the api docs there is only ! Which seems to operate on a task level only. This is my code: https://github.com/clojure-quant/quanta/blob/master/lib/trade/src/quanta/trade/broker/paper/broker.clj when the parent flow.gets :order-cancel then I want to cancel the corresponding child flow.
Have you considered group-by ? Then you can terminate a group with take-while - something like :
(defn create-single-order [opts order]
(m/eduction (take-while some?)
(m/ap (let [order-data (m/?> 2 order)]
(case (:type order-data)
:new-order (m/?> (random-fill-flow opts order-data))
:cancel-order nil)))))
(defn create-order-flow-switch [opts order-input-flow]
(m/ap (let [[_ order] (m/?> ##Inf (m/group-by :order-id order-input-flow))]
(m/?> (create-single-order opts order)))))(untested)
BTW m/amb with one argument is identity
https://clojurians.slack.com/archives/CL85MBPEF/p1713359533591059?thread_ts=1713258885.018919&cid=CL85MBPEF Has anyone played with vthreads and missionary since then?