Fork me on GitHub
#core-async
<
2020-05-08
>
bfay19:05:23

Just want a sanity check, does this pattern seem safe? I'm running a worker task on a future, and I'd like to be able to cleanly exit it, so that whatever work is in progress is cleanly finished, and then the future exits out of its loop without sleeping. I'm using async/alt!! as a trick for this:

(let [exit-ch (async/chan)
        my-task (future (loop []
                          ;;do work here
                          (async/alt!! (async/timeout 5000) (recur)
                                       exit-ch-ch nil)))
        exit-fn (fn [] (async/close! exit-ch))]
    ;;eventually will run exit-fn to safely cancel the future
  )
I'm seeing an issue where it appears this worker task seems to be no longer running... I'm wondering if using alt!! this way could cause some unexpected problems?

hiredman19:05:33

alt!! should work fine like that

hiredman19:05:26

I always write it like

(async/alt!! (async/timeout 5000)
             ([_] (recur))
             exit-ch-ch
             ([_] nil))
but I think I recall it working if you elide the binding for the result as well

Ben Grabow19:05:28

I've never seen recur used inside the body of alt!! and I'm skeptical that would work given the macro rewriting that happens inside alt!!.

Ben Grabow19:05:27

Can you try wrapping the body of your future with try catch and print if that body is throwing? My guess is you get an exception thrown from alts!! but the exception is swallowed by the future.

bfay19:05:49

Yeah that's a good point, I did try derefing the future after closing the channel, and all I saw returned was nil (would have expected an exception if alt!! had thrown one) But I will definitely try that

Ben Grabow19:05:20

Normally the clauses in alt!! are values or at least expressions that evaluate to a value, but recur is different than that

bfay19:05:32

right now I'm just running a stripped-down version of this that runs the future, loops/recurs and prints.. want to see if it will eventually stop or gobble up a ton of memory. So far it's run over 22,000 times with no issues

Ben Grabow19:05:30

Other than the recur thing I don't see anything glaring about your use of alt!!. I'm guessing you typed up the example above in slack so the mismatch between shutdown-ch and exit-ch is not actually a problem in your real code? Also I normally put the shutdown-ch first and use the {:priority true} option to deterministically shut down as soon as the shutdown-ch closes.

bfay19:05:56

haha whoops yes, just a typo. Wasn't familiar with the priority option but that sounds like a good improvement. Thank you!

bfay19:05:51

For more context, I was originally using a future that's running a long-running task that did a 5 second thread/sleep (same loop/recur kind of pattern). There was no clean way to kill this future on service shutdown; I could try signalling it to shutdown using an atom, but with that Thread/sleep it would still take a long time to exit. I introduced alt!! with this timeout and the shutdown-ch as a way to retain the original sleeping behavior, but give it an option to exit immediately by closing the exit channel

dpsutton19:05:35

i wrote something similar to this yesterday/today:

(defn watch-for-cancelling [{job-id :id} cancel-ch stop-ch]
  (a/go-loop []
    (if-let [cancel? (a/<! (a/thread (-> {:select [:id]
                                          :from [:]
                                          :where [:and
                                                  [:= :id job-id]
                                                  [:= :status "cancelled"]]}
                                         (hdb/fetch)
                                         seq)))]
      (do (log/warnf "Cancelling job: %s" job-id)
          (a/>! cancel-ch ::cancel))
      (when (= :recur (a/alt! (a/timeout 5000) :recur
                              stop-ch :job-done))
        (recur)))))

hiredman19:05:23

you can recur directly from within an alt

dpsutton19:05:38

i wouldn't have thought to try that. neat

hiredman19:05:17

but I never write an alt clause without a binding, so I forget how that works

dpsutton19:05:00

what do you mean with a binding?

hiredman19:05:27

a binding for the result of the alt

dpsutton19:05:48

there are four in the snippet you provided

hiredman19:05:53

(alt! whatever ([x] do-something-with-x))

bfay19:05:07

It seems to be working fine without the binding, I'm just getting some weird issues after the code has been running for two days Not sure at this point if it's something elsewhere in my code or something happening with the alt. Probably need some more logging

hiredman19:05:11

x is bound the result of the whatever channel operation

fmjrey19:05:13

Not sure if that's any help, and I have no time to delve into the details of your problem @bfay, but my first thought here is:

core.async/thread works off a separate unbounded thread pool, distinct from
any other thread pool, while clojure.core/future on the other hand uses the
agent thread pool which is bounded and can be used anywhere in the program.
An important difference however: core.async/thread are daemon threads the
JVM will not wait on before exiting, unlike clojure.core/future threads.

hiredman19:05:17

I just looked at the code

hiredman19:05:41

alt! assumes if the expr is a seq then it is a binding

hiredman19:05:52

which is why your recur isn't working

hiredman19:05:12

just always do the binding form

bfay19:05:49

wait really? I mean this example runs and has no binding form

(future (loop []
                   (println "in loop" (rand-int 500))
                   (async/alt!! (async/timeout 50) (recur)
                                shutdown-ch nil)))

bfay19:05:43

(where shutdown-ch is just a barebones (async/chan))

hiredman19:05:10

ah, misread the code, it checks if it is a seq, and if the first element of the seq is a vector

hiredman19:05:08

my next guess is you are blocking up the core.async threadpool some how which is causing the timeout's callback not to be able to run

hiredman19:05:45

and you can test that by running your future, and when it stops looping running (clojure.core.async.impl.dispatch/run #(println "Hello World")) if nothing prints then it is blocked

bfay19:05:32

Oh nice, that's a good tip. Behavior-wise it does seem a bit like it could be a blocked threadpool

hiredman19:05:41

the way the blocking operations are implemented (alt!!, <!!, >!!) they are basically the non-blocking versions + a promise, so even if you don't use any go blocks you still end up with callbacks that deliver to the promise running on the core.async threadpool