Fork me on GitHub
#core-async
<
2020-04-21
>
kenny23:04:47

I'm looking to create something that models a branching flow like this

in
+
|
+--->error?+--->error
|
v
xf
+
|
+--->error?+--->error
|
v
out
Data comes in the in, any elements for which error? returns true go to the error channel. Then apply xf to all non-error elements from in. Any elements where error? is true also go to the error channel. Any non-error elements go to out. I was thinking something like this
(defn pipeline-transform
  [in xf error-pred out error]
  (let [[errors in'] (async/split error-pred in)
        _ (async/pipe errors error false)
        apply-xf (async/chan nil xf)
        _ (async/pipe in' apply-xf)
        [errors apply-xf'] (async/split error-pred apply-xf)
        _ (async/pipe errors error)]
    (async/pipe apply-xf' out)
    nil))
That seems a bit hard to follow though. Is there something built-in that I'm missing that would allow me to write this cleaner?

serioga06:04:00

Did you try to use async/pub?

Ben Sless08:04:08

how about this?

(defn pipeline-transform
  [in xf error? out errors]
  (let [applied-ch (async/chan xf)
        [invalid-ch valid-ch] (async/split error? in)
        [fail-ch success-ch] (async/split error? applied-ch)
        errors' (async/merge [invalid-ch fail-ch])]
    (async/pipe valid-ch applied-ch)
    (async/pipe success-ch out)
    (async/pipe errors' errors)))

kenny14:04:35

@U0HJNJWJH think pub is for distribution messages to multiple subscribers. Maybe it'd work here where the 2 subscribers are out & error. Need to think on that a bit more. @UK0810AQ2 Like the original code, that doesn't apply back pressure to the producer. Specifying buffers of 1 would probably prevent that. Since they're essentially the same code, I'm not sure which code layout I'd prefer.

Ben Sless14:04:26

I stuck to the original semantics, I see applying backpressure as orthogonal to the design problem 🙂 maybe build an idiom on top of that... with a building block like this:

(def errors (async/mix erros-out))

(defn either
  [in out errors pred]
  (let [[ch-t ch-f] (async/split pred in)]
    (async/admix errors ch-f)
    (async/pipe ch-t out)))

kenny23:04:58

That code also doesn't apply back pressure on the producer which could be quite problematic.

kenny00:04:14

I suppose bounding all intermediate channels at a buffer size of 1 would fix ti