Hi. I'm scratching my head over this. How come the flow is never terminated here? Moreover, I expected values being prn to be in the order of range, but I'm getting prn of larger numbers first.
(comment
(m/? (m/reduce conj [] (m/eduction (take 2) (bar))))
; expect: [{:id 0} {:id 1}]
; result: flow never terminates,
; and prn multiple values starting from larger numbers
)
(defn bar []
(m/ap
(let [[id m>]
(m/?> ##Inf
(m/group-by :id
(m/eduction (map (fn [i] {:id i}))
(m/seed (range)))))
m (m/?> m>)]
(prn id m)
m)))(m/?> ##Inf ,,,) on an infinite input is dangerous, there is nothing to bound memory.
(m/group-by ,,,) with an unbounded key space is also dangerous, for the same reason.
ap never terminates because it runs in an infinite loop preventing it from catching the cancellation signal.
The loop : when a group value is consumed another group becomes available, the group is immediately consumed by the first m/?>, then a new value is pulled from this group and so on.
Lazy ap should break the infinite loop by deferring the last pull, but the pattern is still dangerous.
The branches spawned by m/?> run concurrently, evaluation order across branches is undefined, and ordering of branch results on transfer is also undefined. The former I think is reasonable but the latter is debatable, branch results could be transferred FIFO as per your expectation.
Alright. That's clear. Thank you.