I am having issues in consolidating two flows into one. I tried with m/latest m/amb= But I am not able to combine them:
(defn create-random-fill-broker [{:keys [fill-probability wait-seconds] :as opts} order-input-flow]
(let [orders (atom {})
fill-flow (m/ap
(loop [i 0]
(let [fills (create-random-fills fill-probability orders)]
(when (seq? fills)
(m/amb fills))
(m/? (m/sleep (* 1000 wait-seconds)))
(recur (inc i)))))
response-flow (m/ap
(let [input-msg (m/?> order-input-flow)]
(process-incoming-message orders opts input-msg)))
output-flow (m/signal (m/latest concat fill-flow response-flow))
broker (random-fill-broker. opts orders order-input-flow output-flow)]
broker))```Thanks @rcmerci I will try that.
@rcmerci your mix function works! Thanks for that! The (count flows) .. this will ensure that each flow will at maximum get one process?
(defn mix
"Return a flow which is mixed by `flows`"
[& flows]
(m/ap (m/?> (m/?> (count flows) (m/seed flows)))))
I usually mix flows using this fn(m/?> (count flows) (m/seed flows)) will generate N(N=`(count flows)`) “processes”, each flow has one “process”.
Ahhh! Thanks! Smart idea!