Fork me on GitHub
#core-async
<
2017-09-22
>
Avichal09:09:02

Thanks @alexmiller this is really helpful.

kwladyka11:09:05

https://pastebin.com/zsJf4N9L how can i start next sync AFTER first one? How to know first one finished?

noisesmith11:09:21

@kwladyka the first dosync doesn't close until it's body completes, but put! is async

noisesmith11:09:59

oh, wait, you are using >!! - so it doesn't return until all the messages are put

kwladyka11:09:31

yeah but i have to be sure all jobs are done before run next one

kwladyka11:09:42

at that moment i am not sure, because something can still processing

kwladyka12:09:14

(archai/fetch-epoch #(>!! out-elastic %) input) - this line is not one time >!! it is doing this many times inside this function

kwladyka12:09:06

so at right moment i have to run:

(doseq [epoch epochs]
        (>!! in-archai {:url (archai/make-url {:stream "BAR"
                                               :epoch epoch})}))

kwladyka12:09:28

if i will run it immediately after first one it can be situation like: first on still processing and >!! out-elastic and second one start doing this at the some time.

kwladyka12:09:38

because there is more than 1 worker

noisesmith12:09:58

@kwladyka what about making two pipeline-async calls, closing the input channel on the first after the first doseq, then waiting on its output (which means all buffered results have completed), then starting up another one to use in the second doseq

kwladyka12:09:30

whatever solution, but how can i know first one finished?

noisesmith12:09:44

by waiting on it's return value right?

noisesmith12:09:20

it will close the to channel - so I guess you also need a second to-channel

noisesmith12:09:38

and iirc it returns its to-channel (for convenience)

kwladyka12:09:45

i tried something like that (<!! (pipeline-async workers-archai out-elastic archai->elastic in-archai)) but it hangs here

noisesmith12:09:02

did you close its input channel when you finished the puts?

noisesmith12:09:24

it doesn't close its output until the input is closed and all results from it are delivered

kwladyka12:09:39

oh… so maybe after

(doseq [epoch epochs]
      (>!! in-archai {:url (archai/make-url {:stream "FOO"
                                             :epoch epoch})}))
i should close channel. Inputs inside will be processing and after that (pipeline-async) will unblock <!!

kwladyka12:09:42

Am i right?

kwladyka12:09:14

like it finish after finish processing all buffered queue not immediately after close channel?

noisesmith12:09:24

right - you probably need to reorder things somewhat

noisesmith12:09:33

and it's slightly more complicated because if you close in-archai that means it will close out-elastic, and the function that consumes from out-elastic is the one that will detect that

noisesmith12:09:03

the more things we need to duplicate here, the more it looks like you need to make a function that gets called twice instead of doing things twice in one function

kwladyka12:09:26

yes, probably now i should a little refactor it

kwladyka12:09:30

BTW (dotimes [_ workers-elastic] do you think this method to create N workers is ok? Any better way?

noisesmith12:09:25

it's how I'd do it

kwladyka12:09:04

(<!! (pipeline-async workers-archai out-elastic archai->elastic in-archai)) hmm this one return immediately when i (close! in-archai), not wait to the end of out-elastic

kwladyka12:09:14

in right situation i should also know when workers will finish processing out-elastic, not only when out-elastic is empty

kwladyka12:09:21

async is sometimes so confuse

noisesmith12:09:09

honestly I'm sick and not totally with it today and should stop trying to help people with code, but yeah, async is intrinsically hard

kwladyka12:09:44

don’t stop help people 😛 😉

kwladyka12:09:06

but as long as you are sick people should help you 🙂

noisesmith12:09:52

I'm just realizing I'm not thinking clearly

kwladyka12:09:58

heh i am sick today too, exactly i for some reason i have extremely allergy today, like i have never had

kwladyka12:09:29

so i perfectly understand “not thinking clearly” today 🙂