Fork me on GitHub
#core-async
<
2020-03-24
>
kirill.salykin12:03:19

Hi all, please advice, does it make sense to use reduce with the pipeline-blocking?

(async/pipeline-blocking concurrent
                               output-chan
                               (reduce
                                (fn [acc hour]
                                  (let [acc* (disj acc hour)]
                                    (worker hour context)
                                    (if (seq acc*)
                                      acc*
                                      (reduced :done))))
                                hours)
                               (async/to-chan hours))
What I want to achieve is to send :done to output channel when all hours were processed

mavbozo14:03:45

i haven't tried that before. async/pipeline-blocking requires a transducer.

mavbozo14:03:45

i've used pipeline-blocking before, but with a transducer.

kirill.salykin14:03:34

that is my concern, that reduce may not work 😞

serioga21:03:27

I think pipeline-blocking returns channel which is close when processing is done.

Ben Sless06:03:44

why not use async/reduce?

Ben Sless06:03:41

or something. You're not propagating the hours, you're working on them

Ben Sless06:03:24

So a way to use pipeline blocking would be Create input channel with transducer cat on it Create output channel Create pipeline blocking with transducer (map #(worker % context)) put hours on the input channel Then do some sort of take from the output channel <!!/`<!`/`take!`. That take will block until the work is finished

kirill.salykin06:03:28

interesting... let me try. thanks!

kirill.salykin07:03:54

works, thanks!

🎆 4