Fork me on GitHub
#missionary
<
2024-04-02
>
Andrew Wilcox06:04:01

I was reading data from a subprocess and it turns out that for this particular IO operation Java doesn't throw InterruptedException or InterruptedIOException when the thread doing the read is interrupted. So I had a task doing (m/? (m/via m/blk (read-data))), I could interrupt the task, which would interrupt the m/blk thread... but it would just continue to wait to read data. My goal here is to terminate the subprocess on cancellation, which closes the input stream reading from the process, which in turn gets (read-data) to unblock and return, and everything can be cleaned up. Executing a callback on cancellation is easy of course:

(defn on-cancel! [task callback]
  (fn [s f]
    (let [dispose! (task s f)]
      (fn []
        (try (callback) (catch Throwable e (prn e)))
        (dispose!)))))
The task spec says that the canceller must not throw, so I wrap with a try catch. The disadvantage of course is that the exception doesn't have anywhere to go, the callback can't do asynchronous operations, and the returned task completion doesn't wait for the cleanup. If interruptions are working I can do
(m/sp
  (try
    (m/? (do-stuff))
    (catch Cancelled e
      (m/? (on-cancel))
      (throw e))))
and Missionary gives me all of that. So I was thinking something like
(defn on-cancel [task cancel]
  (let [dfv (m/dfv)]
    (on-cancel!
     (m/sp
      (m/?
       (m/join (fn [f _] (f))
         (m/attempt
          (m/sp
           (try
             (m/? task)
             (finally
               (dfv :completed)))))
         (m/compel
          (m/sp
           (when (= (m/? dfv) :cancel)
             (m/? cancel)))))))
     (fn []
       (dfv :cancel)))))
which checks whether the cancellation signal has been sent to the child task, not whether the child task managed to respond to the interruption or not. If the child task hasn't completed yet and we receive cancellation, we run the cancel task. We wait for both the child task and cancellation task if any to finish, and finally complete with the completion status of the child task. However, like try catch, if the cancel task itself fails, we complete with that failure. It's not as good of course as try catch when interruption is working since it ends up running the cancel task in parallel. If the child task does handle interruption I imagine it could be possible that the cancel task might not run if the child task manages to complete first, though I suppose that might be fixed by adding a catch Cancelled before the finally.

Andrew Wilcox08:04:50

I haven't tried it yet, but it occurred to me that maybe all I'd need is a task that runs when it's cancelled, perhaps something like

(defn on-cancel [task]
  (let [dfv (m/dfv)]
    (on-cancel!
     (m/compel
      (m/sp
       (m/? dfv)
       (m/? task)))
     (fn []
       (dfv nil)))))

leonoel10:04:52

This is the cleanest way to run an action on cancellation IMO

(defn on-cancel [task cb]
  (m/race (m/sp (try (m/? m/never) (finally (cb)))) task))
I think for this scenario it's OK to ignore exceptions from cb because if the channel cannot be closed then it's very likely in a non-readable state as well so the reader process will eventually crash

Andrew Wilcox10:04:01

Ah yes, that's elegant.

leonoel09:04:35

Sorry, the correct version is

(defn on-cancel [task cb]
  (m/absolve
    (m/race (m/sp (try (m/? m/never) (finally (cb))))
      (m/attempt task))))
Without the absolve/attempt sandwich, if the task crashes then race will wait for the left hand to complete, which never happens

Andrew Wilcox09:04:35

I think for my use case I may not actually need to run the task in parallel, I just need to run a cleanup task on cancel, so my current version is

(defn when-cancelled [on-cancel]
  (m/sp
   (try
     (m/? m/never)
     (finally (m/? (m/compel on-cancel))))))

bbss10:04:09

(def non-empty-process--watched
  (let [!atom (atom nil)]
    [((->>
       (mi/cp (let [never-empty (mi/?< (->> !atom
                                            mi/watch
                                            (mi/eduction (filter identity))
                                            mi/signal))]
                (js/console.log never-empty) ;;prints [{…}, false, false, false, Array(5)] before swap on atom.
                ))
       (mi/reduce {} {})) js/console.log js/console.warn) !atom]))

(reset! (second non-empty-process--watched) 'value) ;;works
I'm trying to understand if this should be possible. The signal publishes an initial value that I don't understand.

leonoel11:04:48

The flow passed to mi/signal is indeed uninitialized. The observed initial state is likely a leak of some internal object, I suspect mi/signal failing to detect lack of initialization and corrupting its state instead of showing a meaningful error

bbss12:04:00

Understood, thank you.