Fork me on GitHub
#missionary
<
2024-04-12
>
Eric Dvorsak14:04:45

given the following minimal repro:

(defn flatten-flows [broken?]
  (m/ap
    (m/?>
     (m/?> ##Inf
           (m/seed
            (map (fn [input]
                   (m/eduction (map (fn [x]
                                      (let [y
                                            (if broken?
                                              (m/? (m/via m/blk x))
                                              x)]
                                        [input y])))
                               (m/seed [4 5 6])))
                 [1 2 3]))))))

(defn consume-flow
  [broken?]
  ((m/reduce
    (fn [_ x]
      (log/info :consume-flow x))
    []
    (flatten-flows broken?))
   (fn [_]
     (log/info "flow consumed"))
   #(log/error "Error during consumption of flow" %)))
> (consume-flow true)
:consume-flow nil
:consume-flow 4
:consume-flow 4
flow consumed

> (consume-flow false)

:consume-flow [3 4]
:consume-flow [2 4]
:consume-flow [1 4]
:consume-flow [3 5]
:consume-flow [2 5]
:consume-flow [1 5]
:consume-flow [3 6]
:consume-flow [2 6]
:consume-flow [1 6]
flow consumed
what am I missing? shouldn't (m/? (m/via m/blk x)) be equivalent to x there?

leonoel14:04:01

m/? is illegal here because it's called from a coroutine context but via an indirect call

Eric Dvorsak15:04:06

so there should be no task in the xf of an eduction?

xificurC15:04:28

m/? is wrapped in an fn, therefore m/ap doesn't see it

xificurC15:04:26

you could write the same eduction code directly in the ap block, something like (let [x (m/?> (m/seed [4 5 6])), y (if broken? (m/? (m/via m/blk x)) x)] [input y]). Same for the outer eduction