Fork me on GitHub
#missionary
<
2024-04-16
>
Eric Dvorsak09:04:45

what is the context of the success continuation function and an error continuation function, are their limitations documented somewhere? As I mentionned yesterday I have noticed cases where some async(?) code in these functions did not seem to run (timbre/log not logging anything but println working fine)

leonoel09:04:02

the limitations are • don't block the thread • don't throw exceptions • return quickly

Eric Dvorsak09:04:34

So in the task specification the sentence "it must return a canceller, must not throw and must not block the calling thread" also apply to the success continuation function and error continuation functions?

Eric Dvorsak09:04:48

or is it specific to missionary implementation of the spec

leonoel09:04:04

but it should not matter if timbre/log is wrapped in an m/via

leonoel09:04:58

it is not implementation specific

Eric Dvorsak09:04:58

ok, it's an interesting gotcha because one doesn't necessarily think that a log statement might be doing crazy async stuff in the background

Eric Dvorsak09:04:09

that got me worried the past few days of experimenting I've add quite a few runs where I thought missionary was not properly terminating the task because it wasn't logging from the success continuation

Eric Dvorsak09:04:22

and I didn't suspect a log statement to fail

Eric Dvorsak09:04:38

So I assume because timbre must be blocking the thread or something, it falls into undefined behavior

leonoel09:04:17

yes I would not have expected that either

leonoel09:04:05

missionary operators could be more resilient to callback abuse, e.g. redirecting exceptions to stderr

Eric Dvorsak09:04:22

so these callbacks are not regular clojure functions, they run in some coroutine context that might terminate them early

leonoel09:04:42

callbacks are machinery, user is not supposed to manipulate them

Eric Dvorsak09:04:04

so a recommended pattern would actually be to just do at the top of the supervision tree? (task (constantly nil) (constantly nil)) and if you want to do something on success the top-level task would be:

(m/sp (let [res (m/? (actual-task))] (m/? (on-success res)))
and a try/catch for on-error?

leonoel09:04:55

yes, that's the recommended pattern

Eric Dvorsak10:04:56

Thanks! Is that somewhere in the doc/wiki that I missed?

leonoel10:04:22

I don't think it's anywhere

Eric Dvorsak10:04:11

I can write it down in an issue so you can edit/copy it into the docs. Seems like an important detail, isn't it basically how one would use missionary in prod?

leonoel10:04:43

yes I agree it's important, you can file an issue

Eric Dvorsak10:04:25

@U053XQP4S here it is https://github.com/leonoel/missionary/issues/105 I think there is also other ways for the error maybe something with m/compel Also would unsupected async like timbre log have unexpected behavior as well if they are run outside of m/via? eg:

(defn task []
  (m/sp 
     (let [res (m/? (actual-task))] 
       (log/info "SUCCESS!"))

Eric Dvorsak10:04:08

I'll refine it through the day.

Eric Dvorsak20:04:27

So yes, even inside the task it's not safe to use timbre logging (at least directly). I was logging the error:

(defn task []
  (m/sp 
    (try
      (m/? (actual-task))
      (catch Exception e
         (log/error "OOPS"))
It's actually the same root cause that was preventing the logs from appearing in the success and cancel continuation functions, log can throw a java.lang.InterruptedException exception from a deref in:
[taoensso.encore$eval3772$get_hostname__3773 invokePrim encore.cljc 5621]
When that happens the exception is silenced. It's interesting that I finally found it by wrapping the log statement in the catch block with a try catch block

leonoel06:04:32

Just to be clear, when you say the exception is silenced you mean due to log/error throwing another exception the original e is lost ?

Eric Dvorsak07:04:06

No they are both lost if I don't add a try catch around the log statement

Eric Dvorsak07:04:26

Also I have to run a test I'm starting to wonder if httpkit is causing the interruption as it uses loom threads

leonoel08:04:46

ok that's definitely not right, would be great if you could provide a full repro

Eric Dvorsak08:04:48

@U053XQP4S interestingly without using http-kit newVirtualThreadPerTaskExecutor I don't get that exception anymore. It still take more than 30 sec (sometime minutes) between the print I have right before throwing and the print before the log in the catch statement (so theoritically nothing inbetween) I will try to make a much more minimal repro

Eric Dvorsak08:04:07

but yeah without virtual thread timbre logs work fine within missionary

Eric Dvorsak09:04:18

@U053XQP4S I refined the issue https://github.com/leonoel/missionary/issues/107 writing a repro really helped narrowing down the issue, the repro now only needs missionary

Eric Dvorsak09:04:30

So essentially a blocking call is as much of an issue in the task than it was in the success and error continuation functions, but where I got confused is that it is seems to only really cause an actual bug when these conditions are met: • throwing an exception • from the m/blk executor • in a m/reduce

leonoel12:04:28

I do not think these snippets show any exception being swallowed

Eric Dvorsak12:04:30

what do you mean? in the last one in particular, you can see everything prints, but if you remove the try-catch around the deref/future it stops printing after "Printing in future"

leonoel12:04:19

That is the correct behavior. deref throws InterruptedException, it is not caught therefore it propagates to the sp task, then you get it in the failure callback

leonoel12:04:00

Are you challenging that deref throws ?

Eric Dvorsak12:04:29

I guess I'm confused that it only throws when all the above conditions are met

leonoel12:04:12

I do not consider it a bug the fact that user code can be run on a thread in interrupted state. While missionary generally discourages blocking outside of m/via, it may sometimes be acceptable if the blocking is quick, in this case the blocking call should ignore thread interruption.

Eric Dvorsak12:04:52

that is the case here, it's just printing

leonoel12:04:15

log statements arguably fall in this category and I find questionable the decision to make timbre logs interruptible

Eric Dvorsak12:04:15

the error is indeed in the callback but then it will throw again if eg one logs errors with timbre because it will again use deref

Eric Dvorsak12:04:12

I agree with you about that, I don't think it's explicitely interruptible, it happens to use a promise/future/deref

Eric Dvorsak12:04:18

how come the interruptible state is only there when throwing in a blocking task while reducing over a flow? eg this works

((m/sp
     (try
       (m/? (m/reduce
             (constantly nil)
             (m/ap (let [s (m/?> (m/seed [1 2 3]))]
                     (m/? (m/sp (throw (ex-info "BOOM" {}))))))))
       (catch Exception e
         (println "Catching the exception")
         @(future (println "Printing in future"))
         (println "This would never print without catching the exception from deref")))
     @(future (println "Printing in future outside catch"))
     (println "And this prints because the deref above doesn't blow up"))
   (constantly nil) println)

leonoel13:04:33

that's an implementation detail of m/via

Eric Dvorsak13:04:59

how can one safely ignore thread interruption in exception handling code?

leonoel13:04:37

(defn uninterrupted [f]
  (let [i (Thread/interrupted)]
    (try (f)
         (finally
           (when i (.interrupt (Thread/currentThread)))))))

leonoel13:04:38

((m/sp
   (try
     (m/? (m/reduce
            (constantly nil)
            (m/ap (let [s (m/?> (m/seed [1 2 3]))]
                    (m/? (m/via m/blk (throw (ex-info "BOOM" {}))))))))
     (catch Exception e
       (println "Catching the exception")
       (uninterrupted #(deref (future (println "Printing in future"))))
       (println "This would never print without catching the exception from deref")))
   @(future (println "Printing in future outside catch"))
   (println "And this prints because the deref above doesn't blow up"))
 prn prn)
this will print past the catch

Eric Dvorsak13:04:30

Thank you very much for helping me debugging through that!

Eric Dvorsak13:04:08

Now I'm aware of how it work and won't trip on it anymore, but I'm worried that newcomers might be quite confused by the difference in behavior

Eric Dvorsak13:04:43

technically it is documented in via doc "Cancellation interrupts the evaluating thread."

leonoel13:04:44

np, thank you for this feedback I will think about possible ways to mitigate this or at least document it better

leonoel13:04:02

yes, but what is not obvious is user code can be called from an interrupted thread even in a non-cancelled process

leonoel13:04:13

let me know if you find any quirks with virtual threads, I've not experimented much with project loom yet

Eric Dvorsak13:04:45

trying to write a small issue describing the problem as temporary documentation, still not sure I understand why this behavior of via only causes the issue when the via is in a flow that is reduced, if its just in a sp it works fine

((m/sp
   (try
     (m/? (m/via m/blk (throw (ex-info "BOOM" {}))))
     (catch Exception e
       (println "Catching the exception")
       @(future (println "PRINTING"))
       (println "This prints because the above doesn't blow up")))
   @(future (println "PRINTING"))
   (println "And this prints because the deref above doesn't blow up"))
 (constantly nil) println)

Eric Dvorsak13:04:44

oh I think I get it it's because there's only 1 blocking task and it throws so it's not cancelled

leonoel13:04:11

I don't know the answer but it's clearly implementation specific

Eric Dvorsak13:04:22

Right I can reproduce like this as well:

((m/sp
   (try
     (println "Running")
     (m/? (m/join (m/via m/blk (throw (ex-info "BOOM" {})))
                  (m/via m/blk (throw (ex-info "BOOM2" {})))
                  (m/sleep 10000)))
     (catch Exception e
       (println "Catching the exception")
       @(future (println "WONT PRINT 1"))
       (println "WONT PRINT 2")))
   @(future (println "WONT PRINT 3"))
   (println "WONT PRINT 4"))
 (constantly nil) println)

leonoel13:04:01

yes, you just need a task that is terminated by cancelled m/via

Eric Dvorsak13:04:23

but what does interrupting the evaluating thread bring? what do other missionary construct do, just throw missionary.Cancelled exception?

leonoel13:04:03

that is the point of process supervision, the pending threads must shut down cleanly to run finally blocks and release resources

leonoel13:04:11

Cancelled is just a special value to indicate the process could not run to completion, like InterruptedException

Eric Dvorsak13:04:15

oh I see the part about finally blocks is mentionned here https://cljdoc.org/d/missionary/missionary/b.36/doc/readme/tutorials/hello-task?q=finally#parallel-composition I guess last time I read through it I wasn't ready to really understand

Eric Dvorsak13:04:57

wouldn't it make sense that compel also inhibits the interruption and not just cancellation?

leonoel13:04:41

not really, all operators rely on the idea that it doesn't matter which thread runs user code (except m/via, but you can still m/compel a m/via to inhibit interruption)

leonoel13:04:43

what could make sense is to ensure m/via always clears interruption flag before running callbacks, but I have to check the implications first

Eric Dvorsak13:04:33

I think the biggest issue is that this can cause the unclean shut down of supervised processes, since the interrupted exception will cause any (accidently) uninterruptible code in a finally block to throw

Eric Dvorsak14:04:46

as in:

((m/sp
   (try
     (println "Running")
     (m/? (m/join []
                  (m/sp (try
                          (m/sleep 1000)
                          (finally
                            (println "finally task")
                            @(future (println "preparing cleanup task"))
                            (println "cleanup task DONE"))))
                  (m/via m/blk (throw (ex-info "BOOM" {})))))

     (finally
       (println "finally supervisor")
       @(future (println "preparing cleanup supervisor"))
       (println "cleanup supervisor DONE"))))
 println println)

leonoel19:04:47

@U03K8V573EC I released b.37. The continuation of m/via now runs after clearing the interruption flag, which means interruptible user code should not be interrupted anymore.

Eric Dvorsak21:04:52

Thanks it worked!

Eric Dvorsak20:04:27

So yes, even inside the task it's not safe to use timbre logging (at least directly). I was logging the error:

(defn task []
  (m/sp 
    (try
      (m/? (actual-task))
      (catch Exception e
         (log/error "OOPS"))
It's actually the same root cause that was preventing the logs from appearing in the success and cancel continuation functions, log can throw a java.lang.InterruptedException exception from a deref in:
[taoensso.encore$eval3772$get_hostname__3773 invokePrim encore.cljc 5621]
When that happens the exception is silenced. It's interesting that I finally found it by wrapping the log statement in the catch block with a try catch block