Fork me on GitHub
#missionary
<
2024-05-01
>
kawas21:05:53

Hello, Playing in a repl with this code, using m/stream and m/seed produces less values than expected. But with a custom seed producing values on a missionary scheduler, it works... Why?

(defn dup [flow]
  (let [flow (m/stream flow)]
    (m/ap (m/amb= [(m/?> flow)] [(m/?> flow) :dup]))))

(defn myseed [coll]
  (m/ap (loop [coll coll]
          (if (seq coll)
            (m/amb (m/? (m/sleep 0 (first coll)))
                   (recur (rest coll)))
            (m/amb)))))

(comment
  ;; should produce 10 values
  (drain! prn (dup (m/seed (range 5))))  ;; => 8 values!
  (drain! prn (dup (myseed (range 5))))  ;; => 10 values (use alt thread)
  )

leonoel07:05:13

The fix is to add another stream on the consumer :

(defn dup [flow]
  (let [flow (m/stream flow)]
    (m/stream (m/ap (m/amb= [(m/?> flow)] [(m/?> flow) :dup])))))
It's hard to explain why without talking about implementation (which is still experimental) I agree this behavior is confusing, still unclear to me what's the right answer here

🙏 1