missionary

awb99 2024-07-11T20:31:23.860199Z

I am having issues in consolidating two flows into one. I tried with m/latest m/amb= But I am not able to combine them:

(defn create-random-fill-broker [{:keys [fill-probability wait-seconds] :as opts} order-input-flow] (let [orders (atom {}) fill-flow (m/ap (loop [i 0] (let [fills (create-random-fills fill-probability orders)] (when (seq? fills) (m/amb fills)) (m/? (m/sleep (* 1000 wait-seconds))) (recur (inc i))))) response-flow (m/ap (let [input-msg (m/?> order-input-flow)] (process-incoming-message orders opts input-msg))) output-flow (m/signal (m/latest concat fill-flow response-flow)) broker (random-fill-broker. opts orders order-input-flow output-flow)] broker))```

awb99 2024-07-12T14:34:11.369739Z

Thanks @rcmerci I will try that.

awb99 2024-07-12T20:18:40.824989Z

@rcmerci your mix function works! Thanks for that! The (count flows) .. this will ensure that each flow will at maximum get one process?

zy C 2024-07-12T06:11:29.209959Z

(defn mix
  "Return a flow which is mixed by `flows`"
  [& flows]
  (m/ap (m/?> (m/?> (count flows) (m/seed flows)))))
I usually mix flows using this fn

zy C 2024-07-13T10:05:51.830149Z

(m/?> (count flows) (m/seed flows)) will generate N(N=`(count flows)`) “processes”, each flow has one “process”.

awb99 2024-07-16T02:09:24.837259Z

Ahhh! Thanks! Smart idea!