given the following minimal repro:
(defn flatten-flows [broken?]
(m/ap
(m/?>
(m/?> ##Inf
(m/seed
(map (fn [input]
(m/eduction (map (fn [x]
(let [y
(if broken?
(m/? (m/via m/blk x))
x)]
[input y])))
(m/seed [4 5 6])))
[1 2 3]))))))
(defn consume-flow
[broken?]
((m/reduce
(fn [_ x]
(log/info :consume-flow x))
[]
(flatten-flows broken?))
(fn [_]
(log/info "flow consumed"))
#(log/error "Error during consumption of flow" %)))
> (consume-flow true)
:consume-flow nil
:consume-flow 4
:consume-flow 4
flow consumed
> (consume-flow false)
:consume-flow [3 4]
:consume-flow [2 4]
:consume-flow [1 4]
:consume-flow [3 5]
:consume-flow [2 5]
:consume-flow [1 5]
:consume-flow [3 6]
:consume-flow [2 6]
:consume-flow [1 6]
flow consumed
what am I missing? shouldn't (m/? (m/via m/blk x)) be equivalent to x there?m/? is illegal here because it's called from a coroutine context but via an indirect call
so there should be no task in the xf of an eduction?
m/? is wrapped in an fn, therefore m/ap doesn't see it
you could write the same eduction code directly in the ap block, something like (let [x (m/?> (m/seed [4 5 6])), y (if broken? (m/? (m/via m/blk x)) x)] [input y]). Same for the outer eduction