Fork me on GitHub
#missionary
<
2024-01-17
>
Olav Fosse10:01:16

How should I go about learning JVM async and concurrency? Open to book suggestions etc :^). Mostly interested in learning what underlies Missionary.

Olav Fosse16:01:04

For learning purposes I tried implementing mi/seed by hand. Is this correct? Any bugs?

(defn seed [coll]
  (fn flow [notifier terminator]
    (let [remaining (atom (seq coll))]
      (notifier)
      (reify
        IFn
        (invoke [iter])
        IDeref
        (deref [iter]
          (let [retval (first @remaining)]
            (swap! remaining rest)
            (if (empty? @remaining)
              (terminator)
              (notifier))
            retval))))))

leonoel16:01:45

for an empty collection it should call terminator immediately

leonoel16:01:53

the actual implementation also crashes on cancellation, which prevents an infinite sequence from running into an infinite loop

Olav Fosse16:01:31

ah true! I’ll have a look at the cancellation implementaiton 🤓 thanks!

Olav Fosse17:01:07

From https://github.com/leonoel/flow#specification: > It may throw an exception to indicate a failure, in this case it must not call the notifier again. Would it be correct to say: > It may throw an exception to indicate a failure, in this case it must not call the notifier again. It must throw an exception if terminator has been called.

leonoel17:01:46

you mean what should happen if the consumer calls deref without having been notified ?

Olav Fosse17:01:41

I just misunderstood the mi/seed cancellation mechanism. I assumed that after cancelling it would crash by making deref throw an exception. I’ll take a further look at mi/seed cancellation tomorrow :^)

leonoel21:01:45

> I assumed that after cancelling it would crash by making deref throw an exception. This is correct

Olav Fosse09:01:50

I got my terminology conflated in my previous message. I meant to say: > It may throw an exception to indicate a failure, in this case it must not call the notifier again. It must throw an exception if the process has been cancelled. Anyways, I think I’ve got a correct implementation now 😅

(defn seed [coll]
  (fn flow [notifier terminator]
    (let [remaining (atom (seq coll))
          cancelled (atom false)]
      (if (empty? @remaining)
        (terminator)
        (notifier))
      (reify
        IFn
        (invoke [iter] (reset! cancelled true))
        IDeref
        (deref [iter]
          (let [retval (first @remaining)]
            (swap! remaining rest)
            (if (or (empty? @remaining) @cancelled)
              (do (terminator)
                  (when @cancelled (throw (Cancelled. "Seed cancelled"))))
              (notifier))
            retval))))))

Olav Fosse09:01:51

This missionary stuff is really heavy on my brain so sorry for confusing things 😅

leonoel09:01:03

> It must throw an exception if the process has been cancelled. It is a missionary convention to crash with an instance of missionary.Cancelled if a process is cancelled before self-termination. The flow spec doesn't enforce it