Fork me on GitHub
#core-async
<
2022-02-03
>
Franco Gasperino18:02:42

good morning. i have a how-to question regarding mult, tap, and the various pipelines.

;; Goal:
  ;;   + messages published on channel out-1.
  ;;   + messages should be sent to both in-1 and in-2.
  ;;   + messages for in-1 and in-2 should travel different
  ;;     processing paths.

  ;; create all 3 base channels.
  (def out-1 (a/chan 5))
  (def in-1 (a/chan 5))
  (def in-2 (a/chan 5))

  ;; create a mult of out-1, and tap this multi to both
  ;; in-1 and in-2.
  (def out-1-mult (a/mult out-1))
  (a/tap out-1-mult in-1)
  (a/tap out-1-mult in-2)

  ;; transducer stack to apply on the pipeline. Take a
  ;; number, add n, then multiply by 2.
  (defn xform
    [n]
    (comp
     (map #(+ n %))
     (map #(* 2 %))))

  ;; The pipeline between out-1 (out-1-mult) and in-1 should
  ;; run the transducer stack with parameter 1.
  (def p-1 (a/pipeline
            1
            in-1
            (xform 1)
            out-1-mult))

  (def p-2 (a/pipeline
            1
            in-2
            (xform 2)
            out-1-mult))

  (a/>!! out-1 1)
  (a/<!! in-1)
  (a/<!! in-2)

Franco Gasperino18:02:24

this results in the following exception:

Exception in thread "async-dispatch-2" java.lang.IllegalArgumentException: No implementation of method: :take! of protocol: #'clojure.core.async.impl.protocols/ReadPort found for class: clojure.core.async$mult$reify__14980

Franco Gasperino18:02:41

is there guidance on higher level channel abstractions and pipelines?

ghadi18:02:58

the result of mult is not a channel

ghadi18:02:17

it's a stateful thing that manages taps/untaps

Franco Gasperino18:02:38

that makes sense

ghadi18:02:42

so you can't pass the mult directly as the argument to pipeline

ghadi18:02:05

you can pass the thing that is "multed" (out-1)

Franco Gasperino18:02:07

but i can use out-1

ghadi18:02:16

(not out-1-mult)

Franco Gasperino18:02:00

it looks like it eludes the pipeline effect, though

Franco Gasperino18:02:25

(a/>!! out-1 1)
  (a/<!! in-1) => 1
  (a/<!! in-2) => 1

Franco Gasperino18:02:12

in this case, ill be expected to use in-1 and in-2 as the from parameter, and spin up another pair of in channels

ghadi18:02:40

user=> (doc a/pipeline)
-------------------------
clojure.core.async/pipeline
([n to xf from] [n to xf from close?] [n to xf from close? ex-handler])
your arguments look backwards

ghadi18:02:43

to <> from

Franco Gasperino18:02:57

i want the message to flow from the out-1 -> in-1|in-2

Franco Gasperino18:02:11

;; Goal:
  ;;   + messages published on channel out-1.
  ;;   + messages should be sent to both in-1 and in-2.
  ;;   + messages for in-1 and in-2 should travel different
  ;;     processing paths.

  ;; create all 3 base channels.
  (def out-1 (a/chan 5))
  (def in-1 (a/chan 5))
  (def in-2 (a/chan 5))

  ;; create a mult of out-1, and tap this multi to both
  ;; in-1 and in-2.
  (def out-1-mult (a/mult out-1))
  (a/tap out-1-mult in-1)
  (a/tap out-1-mult in-2)

  ;; in-1 and in-2 now become source channels for the
  ;; proceeding pipelines.
  (def in-1-final (a/chan 5))
  (def in-2-final (a/chan 5))

  ;; transducer stack to apply on the pipeline. Take a
  ;; number, add n, then multiply by 2.
  (defn xform
    [n]
    (comp
     (map #(+ n %))
     (map #(* 2 %))))

  ;; The pipeline between out-1 (out-1-mult) and in-1 should
  ;; run the transducer stack with parameter 1.
  (def p-1 (a/pipeline
            1
            in-1-final
            (xform 1)
            in-1))

  (def p-2 (a/pipeline
            1
            in-2-final
            (xform 2)
            in-2))

  (a/>!! out-1 1)
  (a/<!! in-1-final)
  (a/<!! in-2-final)

Franco Gasperino18:02:22

the last 2 reads produce 4 and 6 respectively

Franco Gasperino18:02:19

the multi + tap actions turn the in-1 and in-2 into the <from> args of the pipeline

Franco Gasperino18:02:46

effectively broadcast + pipeline i guess