I've been trying to add support for cancellation of my a/thread. It's easy to have it cancel my own work, but if I want it to also cancel Java calls by setting the thread to interrupt is where it gets though. The hard bit for me is, I need to make sure the thread is not in an interrupted state after a/thread has executed, otherwise it will get returned to the pool with the flag set.
Currently, I had a setup where I have a separate go block that watches a channel for cancellation, and sets the thread interrupted flag. The a/thread body uses a volatile! to set it's Thread/currentThread, so that the watcher go block can interrupt it.
(defn thread-with-java-style-cancel
[body-fn]
(let [ret (promise-chan)
interrupter-thread (volatile! nil)
interrupter (go
(when (some? (<! ret))
(when-some [t @interrupter-thread]
(.interrupt t))))]
(thread
(vreset! interrupter-thread (Thread/currentThread))
(try
(when-not (poll! ret)
(let [result (try (body-fn)
(catch Throwable t
t))]
(vreset! interrupter-thread nil)
(offer! ret result)
(close! ret)))
(finally
;; Clear the interrupt flag in case it was set
(Thread/interrupted))))
ret))
I "cancel" by simply putting something else form the outside into the ret channel like so:
(let [result (thread-with-java-style-cancel (fn [] (Thread/Sleep 5000)))]
(offer! result :cancel)
(close! result))
There's one issue though, it can technically race I believe, if interrupter reads the value of interrupter-thread as non-nil, then the thread sets it to nil, offers a result on chan which is dropped (cancel won), the finally then runs and unsets the interrupt flag, and now interrupter calls .interrupt on the thread, so the thread is left with the interrupt flag on.Maybe we should abandon interruptability. JS isolates don't allow for interrupts. Idk if maybe wasm is adding them but I think you have to simulate them in the runtime in wasm iiuc. And with non-interruptable, run-to-completion semantics of isolates, thinking about parallelism is way easier. You can still always terminate the isolate, cancelling it or retrying it like a transaction, but that's your only real interrupt. You can always add in logical interrupts at your application level, with explicit checks in your algorithm, if you need backtracking. But it may well be that interruptability could be a anti-pattern
Hard to build an STM without efficient interruptability though. So an escape hatch would be nice for things like that, rather than having to spawn a whole new thread/isolate
But it's probably an anti pattern in the spirit of programming control flow with exception throwing
It basically makes it impossible for you to have a logically closed process, guaranteed to behave deterministically
Even in the small, unless you can just assume that nobody is calling an interrupt here
But I have almost no experience in actually trying to program with interrupts, so maybe it's just a lack of familiarity
It's cooperative interrupt. The code won't just arbitrarily stop anywhere. The only gotcha is that people often forget that some APIs check for it and might throw. So if you are blocked on some IO, in Java, the IO API from Java will check for interrupt and might throw, and you would need to catch and clean resources associated with the IO. Or if it was a "commit" type of IO, well, why are you cancelling that?
But in general, you should use with-resource if you have cleanup to do post/IO, so you should be good.
That said, I don't know if it's worth the trouble. I have it supported right now. But I'm not sure if it's something people care about or not.
Clojure 1.12.0
user=> (require '[clojure.core.async :as a])
nil
user=> (def t (atom nil))
#'user/t
user=> (a/thread (reset! t (Thread/currentThread)) (.interrupt (Thread/currentThread)))
#object[clojure.core.async.impl.channels.ManyToManyChannel 0x6919a8a "clojure.core.async.impl.channels.ManyToManyChannel@6919a8a"]
user=> (.isInterrupted @t)
false
user=>
Well, that makes my life so much easier lol. So Java will unset the flag itself when returning (or maybe before reusing) a thread in the pool. If I understood correctly ya?
sure looks like that
Hope so haha. As for the problem itself, I think switching to an atom and doing:
(when-some [t (first (swap-vals! interrupter-thread (constantly nil)))]
(.interrupt t))
Might fix the race.Hum, it's possible that the thread pool would be smart, but I think in combination with core.async, maybe not, because I think sometimes it will run a completion handler on the same thread and so on. Not sure, but when I remove my own clean up on the interrupted state I start getting some errors elsewhere due to things being interrupted that should not be
Ah... maybe not. I guess I still need to be sure that my go watcher won't interrupt the thread after it is done, because it could in theory do so at a point in time where another task has now picked up the thread again. So ya, I was just stupid a bit, I still need to clear a reference here.
But now this seems to work:
(defn thread-with-java-style-cancel
[body-fn]
(let [ret (promise-chan)
interrupter-thread (volatile! nil)
interrupter (go
(when (some? (<! ret))
(when-some [t @interrupter-thread]
(.interrupt t))))]
(thread
(vreset! interrupter-thread (Thread/currentThread))
(try
(when-not (poll! ret)
(let [result (try (body-fn)
(catch Throwable t
t))]
(vreset! interrupter-thread nil)
(offer! ret result)
(close! ret)))
(finally
(vreset! interrupter-thread nil))))
ret))Hum... or I wonder if this can have the same race condition, just even less likely to happen. Even though the thread interrupt will be reset properly by the pool, the go block could outlive this execution, and if it read the thread before it was set to nil, it could interrupt a thread that is now used by something else.