Fork me on GitHub
#missionary
<
2023-10-22
>
adamfrey23:10:01

If I have two continuous flows which emit values which are distinguishable by their contents, how can I create a consumer that pulls values from both flows as quickly as either has a value to give?

(def flow1
    (m/ap
      (loop []
        (let [sleep (rand-int 10)]
          (m/? (m/sleep sleep))
          (m/amb
            [:a sleep]
            (recur))))))
  
  (def flow2
    (m/ap
      (loop []
        (let [sleep (rand-int 20)]
          (m/? (m/sleep sleep))
          (m/amb
            [:b sleep]
            (recur))))))

  (m/? (m/reduce conj (m/eduction (take 20) ???)))

adamfrey00:10:50

Actually, maybe I shouldn't have used the word continuous. What I meant was "potentially infinite, backpressured flows" m/latest seems a little awkward to use in that case. Here's a modified version:

(def flow1
    (m/ap
      (m/amb
        :init-a
        (let [delay (m/?> (m/seed [1 2 3]))]
          [:a (m/? (m/sleep delay delay))]))))

  (def flow2
    (m/ap
      (m/amb
        :init-b
        (let [delay (m/?> (m/seed [10 20 30]))]
          [:b (m/? (m/sleep delay delay))]))))

  ;; desired reduction
  [[:a 1] [:a 2] [:a 3] [:b 10] [:b 20] [:b 30]]

  (m/? (m/reduce conj (m/latest vector flow1 flow2)))
  ;; produces
  [[:init-a :init-b]
   [[:a 1] :init-b]
   [[:a 2] :init-b]
   [[:a 3] :init-b]
   [[:a 3] [:b 10]]
   [[:a 3] [:b 20]]
   [[:a 3] [:b 30]]]
I don't care about a consistent view of snapshot of the values across the two flows, I just want the values in the roughly the order they were ready to be emitted from either. It almost seems like I want both flows to write to a m/mbox or something like it but I don't want an unbounded buffer.

Dustin Getz01:10:11

oh you want to race them

Dustin Getz01:10:52

you want “mix” there is an implementation in a closed github issue

leonoel07:10:56

the essence of this problem is amb=

(m/reduce conj (m/ap (m/?> (m/amb= flow1 flow2))))

pez08:10:06

An m/mix would be lovely.

leonoel08:10:25

there is nothing wrong with mix, I'm just not interested promoting it

adamfrey10:10:45

Beautiful, thank you. I thought there was something fairly basic I was missing. It wasn't obvious to me that you could pass the flows directly to amb= like (m/amb= flow1 flow2)