This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
If I have two continuous flows which emit values which are distinguishable by their contents, how can I create a consumer that pulls values from both flows as quickly as either has a value to give?
(def flow1
(m/ap
(loop []
(let [sleep (rand-int 10)]
(m/? (m/sleep sleep))
(m/amb
[:a sleep]
(recur))))))
(def flow2
(m/ap
(loop []
(let [sleep (rand-int 20)]
(m/? (m/sleep sleep))
(m/amb
[:b sleep]
(recur))))))
(m/? (m/reduce conj (m/eduction (take 20) ???)))
m/latest?
Actually, maybe I shouldn't have used the word continuous. What I meant was "potentially infinite, backpressured flows"
m/latest
seems a little awkward to use in that case.
Here's a modified version:
(def flow1
(m/ap
(m/amb
:init-a
(let [delay (m/?> (m/seed [1 2 3]))]
[:a (m/? (m/sleep delay delay))]))))
(def flow2
(m/ap
(m/amb
:init-b
(let [delay (m/?> (m/seed [10 20 30]))]
[:b (m/? (m/sleep delay delay))]))))
;; desired reduction
[[:a 1] [:a 2] [:a 3] [:b 10] [:b 20] [:b 30]]
(m/? (m/reduce conj (m/latest vector flow1 flow2)))
;; produces
[[:init-a :init-b]
[[:a 1] :init-b]
[[:a 2] :init-b]
[[:a 3] :init-b]
[[:a 3] [:b 10]]
[[:a 3] [:b 20]]
[[:a 3] [:b 30]]]
I don't care about a consistent view of snapshot of the values across the two flows, I just want the values in the roughly the order they were ready to be emitted from either.
It almost seems like I want both flows to write to a m/mbox
or something like it but I don't want an unbounded buffer.oh you want to race them
you want “mix” there is an implementation in a closed github issue
the essence of this problem is amb=
(m/reduce conj (m/ap (m/?> (m/amb= flow1 flow2))))
https://github.com/hyperfiddle/electric/blob/cc55772f18bc46373f131e092fc20055c8062b59/src/contrib/missionary_contrib.cljc#L6 missionary only provides the foundational primitives