Fork me on GitHub
#missionary
<
2022-07-03
>
zeitstein17:07:39

Debounce example from https://github.com/leonoel/missionary/blob/master/doc/tutorials/hello_flow.md#preemptive-forking spelled out:

24 79  67  34  18  9   99  37   ;; clock flow val
24 103 170 204 222 231 330 367  ;; val gets to bounce, bounce delay starts
74 153 220 254 272 281 380 417  ;; bounce delays until
       204 222 231     367      ;; bounce terminated/cancelled by (read ?< docstring)
I saw others struggled with this one too 🙂 Note from a beginner: "non-preemptively" in ?> doc was confusing without reading the doc for ?<. Would adding something like "process for newly emitted value is run after the previous val's process completes" be correct? Or rather just the prev process not being cancelled by new val being emitted?

Dustin Getz17:07:49

if someone will copy all missionary docstrings into a nicely formatted notion database, i will do a pass on de-jargoning docstrings and adding simple examples, and then present them to Leo to validate and get merged. it can be a living document as well for us all to contribute examples

yes 1
zeitstein17:07:52

fn name | args | doc | examples ?

Dustin Getz17:07:17

sure, don't over think it

leonoel17:07:50

The right place for jargon-free material is the tutorials. I don't want to de-jargonize docstrings, I think jargon is useful if properly defined, and the right place for it is the reference documentation because it's the only place where you can be absolutely precise. Examples, however, are more than welcome.

Richie17:07:52

I also had to draw out the debounce example.

👍 1
Dustin Getz18:07:55

@U053XQP4S I would like to try anyway, I am confident that we can produce accessible docstrings that are also acceptable to you

👍 1
zeitstein18:07:04

https://zeitstein.notion.site/0af2161828524d12829f8b8ef199066e?v=0d359987e31c4a8f9e488f72745a12d9 Notion format ok? The thinking is that multiple or long examples would not be readable in a column.

👍 1
Dustin Getz18:07:07

that's perfect with the examples inside the item modal

zeitstein18:07:44

In examples, missionary fns are sometimes namespaced, sometimes not. Should standardise? m/* is longer, but m/reduce is clearer.

Dustin Getz18:07:22

just copy paste

leonoel18:07:39

http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Flowable.html this is the level of detail I'd like docstrings to have. drawings are highly valuable here

👍 1
👀 1
zeitstein19:07:47

I think I'm done copying to Notion.

zeitstein19:07:01

One very minor thing: would be nice for args names (`r`) to match names in docs (`reference`), eg:

zeitstein04:07:03

More confusing example of above

Richie18:07:59

I'm trying to understand why this code doesn't work. I have an atom of a map of [keyword flow] pairs and I want to print the map with the latest value for each flow. e.g. I expect to see current db: {:first 2 :second 1} but I only see current db: {:first 2}. Thanks!

Richie18:07:25

(let [first-count (atom 0)
      second-count (atom 0)
      flow-db (atom {})]
  (println "BEGIN EXPERIMENT")
  ((m/reactor
    (m/stream!
     (m/ap (let [db (m/?> (m/watch flow-db))
                 _ (println "new db:" db)
                 current (m/?> (apply m/latest (fn [& args] (do (println "zipmap" (keys db) args)
                                                                (zipmap (keys db) args)))
                                      (vals db)))]
             (println "current db:" (pr-str current))))))
   prn prn)
  (println "assoc first")
  (swap! flow-db assoc :first (->> (m/watch first-count)
                                   (m/eduction (map #(do (println "first-count" %) %)))))
  (println "assoc second")
  (swap! flow-db assoc :second (->> (m/watch second-count)
                                    (m/eduction (map #(do (println "second-count" %) %)))))
  (println "inc first")
  (swap! first-count inc)
  (println "inc second")
  (swap! second-count inc)
  (println "inc first")
  (swap! first-count inc))

;; BEGIN EXPERIMENT
;; new db: {}
;; zipmap nil nil
;; current db: {}
;; assoc first
;; new db: {:first #object[Function]}
;; first-count 0
;; zipmap (:first) (0)
;; current db: {:first 0}
;; assoc second
;; inc first
;; first-count 1
;; zipmap (:first) (1)
;; current db: {:first 1}
;; inc second
;; inc first
;; first-count 2
;; zipmap (:first) (2)
;; current db: {:first 2}

Dustin Getz19:07:11

For continuous flows (like m/watch on an atom), the atom will never "terminate" naturally so you need to fork with m/?< ("switch"). If you use m/?> ("concat") it will wait for the current flow to terminate, which doesn't naturally occur for a continuous flow derived from atom, so it never continues

👍 1
Dustin Getz19:07:37

(Generally, continuous flows are defined for all time and don't naturally terminate until cancelled or crash, IIUC. So you most often want m/?< when working in continuous time iiuc)

👍 1
Dustin Getz19:07:24

In your example, changing m/?> to m/<? in two places doesn't actually produce the correct behavior yet – I am still trying to understand why

👍 1
Richie19:07:25

Oh. It seems that I do not understand ?< and ?>. I can probably learn about this on the http://reactivex.io site. I was thinking of ?< as interrupting the rest of the ambiguous process and ?> as letting the rest of the ambiguous process finish before backtracking.

Dustin Getz19:07:26

what you wrote ^ matches my understanding; you don't want to wait for the process (atom watch) to terminate because it never will

leonoel19:07:54

it is correct, you certainly want ?< here. you also need to catch the missionary.Cancelled exception that will be thrown by the nested flow when it will be cancelled, otherwise the reactor will crash

👀 1
leonoel19:07:39

in reactivex terminology, ?< is switchMap and ?> is concatMap

👀 1
Richie19:07:47

Hmm. Dustin, let me try working that differently. I was thinking of ?> as taking one value out of the flow and running the rest of the ap with that value before backtracking to ?>. From your description of ?> (aka "concat") it sounds like it won't continue with the ap until the flow (i.e. (?> flow)) is done.

Dustin Getz19:07:58

Leo can you :thumbsup: each correct comment I made and any errors please, just to be sure (now and in future)

Richie20:07:48

(let [first-count (atom 0)
      second-count (atom 0)
      flow-db (atom {})]
  (println "BEGIN EXPERIMENT")
  ((m/reactor
    (m/stream!
     (m/ap (let [db (m/?< (m/watch flow-db))
                 current (try (m/?< (apply m/latest (fn [& args] (zipmap (keys db) args))
                                           (vals db)))
                              (catch js/Object _))]
             (println "current db:" (pr-str current))))))
   prn prn)
  (swap! flow-db assoc :first (m/watch first-count))
  (swap! flow-db assoc :second (m/watch second-count))
  (swap! first-count inc)
  (swap! second-count inc)
  (swap! first-count inc))

;; BEGIN EXPERIMENT
;; current db: {}
;; current db: {:first 0}
;; current db: nil
;; current db: {:first 0, :second 0}
;; current db: {:first 1, :second 0}
;; current db: {:first 1, :second 1}
;; current db: {:first 2, :second 1}

Richie20:07:13

Thanks Leo. I never knew that I needed to catch the Cancelled.

Richie20:07:07

I initially wrote missionary code with ?< without knowing why and switched to ?> because I worried that interrupting the ap would give confusing nondeterministic behavior in my print statements. I didn't understand the difference between ?< and ?>.

Richie20:07:40

Is that the best place for the try catch?

Dustin Getz20:07:35

I think so yes, what's happening is • the m/latest is being rebuilt (because the (vals db) has changed), • which means it needs to cancel the old m/latest, • which propogates cancellation to the child processes which is :first flow (a m/watch); • the watch receives cancellation signal (request) and then terminates by throwing missionary.Cancelled, • the Cancelled exception propagates upstream through the DAG to make sure any sibling processes are cancelled in deterministic fashion • userland needs to stop the propagation or the entire supervision tree will cancel

Dustin Getz20:07:53

This is surprising that we need to catch the error, why? • i think the code can be rewritten so that the :first flow is reused, instead of cancelled and rebuilt • i think the dynamic dependency on the number of flows in the map is the root of this? • are m/ap m/cp m/sp blocks are intended to be not dynamic? – this is the edge of my understanding

Dustin Getz21:07:57

I moved the Cancelled into the tightest place possible. I was unable to make the cancel go away entirely, due to the dynamic nature of needing to rebuild an m/latest when (vals db) changes

Dustin Getz21:07:08

the cancellation also causes an undesired nil propagation frame; Leo how can this be avoided?

Dustin Getz21:07:42

I see now that the cancelled is fairly harmless, we can just move the try/catch to the edge of the system and silence it; it is the old monadic layer that is being cancelled and throwing Cancelled to "finish up", the next monadic layer about to be created is unimpacted. The monadic structure here is complex, only the nested process is cancelled, the >>xs parent process is not

Dustin Getz21:07:42

I don't understand why an outer try/catch should see the nested frame's cancellation at all. It feels like the exception is crossing from a nested monadic layer (which is terminating) to an outer monadic layer (which is just switching)? I may misunderstand try/catch. I expected try/catch to be scoped to a single monadic layer

Dustin Getz21:07:28

Is "frame" == "monadic layer" ?

Dustin Getz21:07:45

Or is "frame" == the entirely flatMapped concrete process state

Dustin Getz21:07:22

Since everything is flattened, I guess the exception must pass through the outer try/catch on it's way to death and we don't care, we just continue.

Dustin Getz21:07:11

Therefore really the confusion is an interaction between the routine/uneventful death of a nested frame and the reactor, whose contract is to terminate if an exception is seen. Why does the reactor terminate under routine/uneventful exception?

Dustin Getz21:07:42

Summary: the process state has been forked. The old process is crashing to its death. Then the new process begins, resuming from the forked state. (The verb "fork" now makes sense to me; I never really thought about it.) Since process death is a routine event, the only confusing thing is why the reactor terminates when it sees a process die, thereby bringing another active process down with it. Idea: change reactor termination criteria to be: terminate when there is no longer an active process.

Richie21:07:41

My thought on https://clojurians.slack.com/archives/CL85MBPEF/p1656884022062959?thread_ts=1656873299.996749&amp;cid=CL85MBPEF is that we have a pattern that we recognize as a monad and it's built on top of Clojure which doesn't know about our monad. Why do you expect try/catch to be scoped to a "single monadic layer"? Did missionary customize try/catch behavior? I'm sincerely asking since there's so much I don't know.

Richie21:07:52

I quoted "single monadic layer" since I'm not sure that that's the best wording for it. I think I know what you mean though.

🙂 1
Richie22:07:41

"I'm not sure that that's the best wording for it" in the sense that I'm not sure that it means the same thing for both of us and I don't have better words myself. By reusing your words, you might think that I'm on the same page when I'm actually not. I don't mean to suggest that you could have used better words.

❤️ 1
Richie22:07:04

Does ?< only throw when parked or does it interrupt the body?

((m/relieve {}
            (m/ap (m/?< (m/seed (range 10)))
                  (println "alice")
                  (println "bob")))
 prn prn)
Prints "alice\nbob\n" 10 times. Can it ever print "alice" twice in a row skipping "bob"?

Richie22:07:54

(let [first-count (atom 0)
      second-count (atom 0)
      flow-db (atom {})]
  ((m/reactor
    (m/stream!
     (m/ap (try
             (let [db (m/?< (m/watch flow-db))
                   current (m/?< (apply m/latest (fn [& args] (zipmap (keys db) args))
                                        (vals db)))]
               (println "current db:" (pr-str current)))
             (catch Cancelled _)))))
   prn prn)
  (swap! flow-db assoc :first (m/watch first-count))
  (swap! flow-db assoc :second (m/watch second-count))
  (swap! first-count inc)
  (swap! second-count inc)
  (swap! first-count inc))

;; current db: {}
;; current db: {:first 0}
;; current db: {:first 0, :second 0}
;; current db: {:first 1, :second 0}
;; current db: {:first 1, :second 1}
;; current db: {:first 2, :second 1}
Right... So, emphasis on the idea of forking over the idea of monad.

Richie22:07:01

Since I need to catch the Cancelled just inside the ap and since not catching it at all cancels the reactor then it's that I'm cancelling the fork but I haven't forked the reactor so I don't want to cancel that.

Richie22:07:59

(let [first-count (atom 0)
      second-count (atom 0)
      flow-db (atom {})]
  ((m/reactor
    (m/stream!
     (try (m/ap (let [db (m/?< (m/watch flow-db))
                      current (m/?< (apply m/latest (fn [& args] (zipmap (keys db) args))
                                           (vals db)))]
                  (println "current db:" (pr-str current))))
          (catch Cancelled _))))
   #(println :success %) #(println :cancelled %))
  (swap! flow-db assoc :first (m/watch first-count))
  (swap! flow-db assoc :second (m/watch second-count))
  (swap! first-count inc)
  (swap! second-count inc)
  (swap! first-count inc))

;; current db: {}
;; current db: {:first 0}
;; :cancelled #object[Object [object Object]]

Dustin Getz22:07:26

> Why do you expect try/catch to be scoped to a "single monadic layer"? Did missionary customize try/catch behavior? m/ap blocks are macros that alter clojure syntax using cloroutine to implement forking special forms as macro breakpoints. Whether or not that justifies the quoted intuition - i don't know. In that intuition, I am framing m/ap as do-notation for process monad. Currently I am confused and feeling uncertain

Richie22:07:08

Ok, I see. Thanks.

Dustin Getz22:07:56

https://clojurians.slack.com/archives/CL85MBPEF/p1656886564654559?thread_ts=1656873299.996749&amp;cid=CL85MBPEF I think forking primitives cannot interrupt clojure/jvm/js, and "interrupt" i think is the wrong metaphor. m/ap first will break apart the body at the cloroutine breakpoints (m/?< and friends). So conceptually cloroutine thunks (println "alice") (println "bob") into an opaque continuation. The reactor will run the propagation frame to completion (planning an efficient and glitch-free order to run all downstream thunks). When the frame completes, now we are idle and the m/seed is "ready", a ready notification will propagate without computing anything yet (backpressure) and then a consumer will eventually pull a value through, triggering the next propagation frame. Leo will need to confirm this has no errors

Richie22:07:27

Thank you! That makes sense to me.

Dustin Getz22:07:54

> https://clojurians.slack.com/archives/CL85MBPEF/p1656887941592819?thread_ts=1656873299.996749&amp;cid=CL85MBPEF I think it's wrong to say the reactor can be "forked", the reactor is really the "entrypoint" - it's the unsafePerformIO for process monad. The reactor holds the process state and implements the forking

Richie23:07:38

Right. Since I can't fork the forker then I have to catch the Cancelled. Sorry, I was sloppy with my wording there.

Dustin Getz23:07:21

Ah, I understand your wording now

Richie23:07:47

Is (being able to catch the inner exception at the outer level) breaking your mental model of a process monad? Am I understanding that correctly? If fork is map and join is flatten then, while I don't know where throw fits in, I think I can understand the confusion of throw skipping the bind (flatMap). I would expect the exception to be in the context so that it could be an F that's in a state of exception.

Richie23:07:34

Oh, is ap the forker and that's why I need to catch it inside the ap? I'm going with that.

Dustin Getz23:07:37

I do not understand your language in *2. As to *1, I understand m/ap as just syntax sugar (do-notation) for lower level flow primitives which have low-level notify/terminate dual callbacks. Any exceptions will propagate through the terminator callback chain all the way to the entrypoint (C-f "terminator" https://github.com/leonoel/flow). try/catch inside m/ap is perhaps the most convenient syntax for intercepting a terminate signal

Dustin Getz23:07:38

So if a process forks, and the prior process frame is now dying, and we intercept the termination signal, and we redirect it to a successful result, presumably the process will still terminate, just with a value instead of an exception. upstream cleanup lifecycle hooks will still be called. right @U053XQP4S?

Richie03:07:50

Sorry. I learned a little about monads when I programmed some Scala. I think a monad interface in Haskell has fmap and bind although in Scala it's map and flatMap iirc. fmap takes a context and a function and applies the function in the context. (F[A], A -> B): F[B] bind also takes a context and a function and applies the function to the value in the context except it expects the function to itself return a context and it'll flatten the two contexts together so that it doesn't come out nested. (F[A], A -> F[B]): F[B] You probably know better than me; I want to put it out there so that I can get corrected if I'm wrong. So, if the monadic context F is "process" then maybe fmap is "fork" and bind is "fork then join". In order to fit exceptions into the picture it could go in the context i.e. F. I may just be remembering https://zio.dev/datatypes/core/zio/ that did that. I thought that's the direction you were going in order to explain why exceptions wouldn't cross "monadic layers". The layers would still have to flatten though. Anyways. Idk.

Dustin Getz13:07:54

Leo and I discussed cancellation's interaction with m/?< (switch) and I have a much clearer understanding. Here is a test showing that m/ap crashes on switch. I proposed that this is not the desired behavior. My proposed behavior is that, once the m/ap switches and sends the cancellation signal, the stale child process should be orphaned as it dies and detached from the m/ap. Leo is going to think about it.

Dustin Getz13:07:25

If the behavior is changed, it means the child process death (terminal exception or value) would be contained by the parent; the exception would not escape the parent m/ap block and therefore not propagate to the entrypoint and the reactor would not see it

Dustin Getz13:07:21

Leo did not say he agrees with the proposal, he is going to think about it

Richie15:07:57

Ok, thanks for the update. Also, thank you for working through the issue and helping me to better understand the behavior.

🙂 1