Hello,
Playing in a repl with this code, using m/stream and m/seed produces less values than expected. But with a custom seed producing values on a missionary scheduler, it works... Why?
(defn dup [flow]
(let [flow (m/stream flow)]
(m/ap (m/amb= [(m/?> flow)] [(m/?> flow) :dup]))))
(defn myseed [coll]
(m/ap (loop [coll coll]
(if (seq coll)
(m/amb (m/? (m/sleep 0 (first coll)))
(recur (rest coll)))
(m/amb)))))
(comment
;; should produce 10 values
(drain! prn (dup (m/seed (range 5)))) ;; => 8 values!
(drain! prn (dup (myseed (range 5)))) ;; => 10 values (use alt thread)
)
The fix is to add another stream on the consumer :
(defn dup [flow]
(let [flow (m/stream flow)]
(m/stream (m/ap (m/amb= [(m/?> flow)] [(m/?> flow) :dup])))))
It's hard to explain why without talking about implementation (which is still experimental)
I agree this behavior is confusing, still unclear to me what's the right answer here