missionary

awb99 2025-04-07T21:21:13.602489Z

Is there a way to cancel a flow that is created with m/watch (on an atom)? My usecase is progress updates of a long running task. After the task finished the watch to the atom that m/watch created needs to be Cancelled, and the progress consumer should get the end of the flow.

👀 1
awb99 2025-04-08T14:44:17.666459Z

Thanks @leonoel I want to generalize the canceling of the flow. Is it safe to put m/Cancelled into the atom (reset! a (Cancelled.)) So that the take-while can stop when it does receive Cancelled. I guess it is safe to do so, as missionary only throws Cancelled; so in my usecase it would be a Cancel that is not thrown. As to the switch I dont know what you mean.

leonoel 2025-04-08T14:55:13.448439Z

This is a switch. When the conditional becomes true, the else branch is unmounted, cancelling the watch

(def !task-result (atom nil))
(def !task-progress (atom 0))

((m/reduce (fn [_ x] (prn x)) nil
   (m/cp (if-some [r (m/?< (m/watch !task-result))]
           r (m/?< (m/watch !task-progress)))))
 prn prn)

(reset! !task-progress 30)
(reset! !task-progress 60)
(reset! !task-result :ok)

🙏 1
leonoel 2025-04-08T16:07:04.372829Z

It is OK to use a Cancelled instance as a sentinel as long as it doesn't introduce any ambiguity

leonoel 2025-04-08T16:08:19.535989Z

I have considered having m/watch terminate when the atom state is a reduced?

awb99 2025-04-08T16:43:23.819319Z

Thanks again @leonoel reduced? is perhaps even better than Cancelled. I love the switch syntax example. But to be honst, I still dont undertand the m/?< operator. Can I think of it this way: This operator only works on continuous processes. Since the entire expression works with continuous flows, this means that all received values are always available, and therefore I should not understand it like a discrete flow that has updates, but rather as something that always exists. And the m/?< is just something that activates a watch. Or perhaps I need to think in terms of "execution contexts" that are created when there is a change. I am using missionary now for a year, and still m/?< is what I dont get. 🙂

leonoel 2025-04-08T18:50:39.636229Z

the continuous flow abstraction captures the idea of having a current state, ignoring how this state is effectively mutated m/?< means "give me the current state", like a reactive deref

awb99 2025-04-08T19:19:39.128999Z

Thanks @leonoel Are the following statements true? 1. m/?< can only be used on continuous flows. (So using it on a discrete flow will not work), 2. If both m/?< and m/?> are used in a m/cp or m/ap, then effectively m/?< can be used as a switch, and then then only one of multiple m/?> could be executed (so in other words they would not be actively consumed.

leonoel 2025-04-08T19:20:54.642049Z

1. true

leonoel 2025-04-08T19:41:39.475619Z

2. I would rephrase it - everything after a m/?< depends on the current state, therefore the rest of the computation (the continuation, in literature) must be reconstructed when the current state changes. If the previous computation depends on another effect (i.e. a m/? or m/?> or another m/?<) and this effect is still alive on state change, it is cancelled.

awb99 2025-04-08T20:31:25.207969Z

The previous computation? Do this forms leads to the same result? 1. (m/cp (let [a (m/?< a-flow) b (m/?< b-flow) c (m/?< c-flow)] (vec a b c)) 2. (m/latest vec a-flow b-flow c-flow). Or would a-flow and b-flow be started/stopped when c-flow changes?

leonoel 2025-04-09T05:24:10.997709Z

in 1, b is after a and c is after b so c-flow is reconstructed when either b or a changes, b-flow is reconstructed when a changes, and a-flow lives forever

leonoel 2025-04-09T05:39:40.958229Z

In this case, 1 and 2 lead to the same result but only because there are no side-effects. missionary coroutines follow the same evaluation rules as clojure, i.e. expressions are always evaluated sequentially even when their results do not depend on each other. Therefore, the ordering of effects is well defined and exceptions work as expected.

leonoel 2025-04-09T05:42:43.118479Z

In other words, if a b c are concurrent, you must state it explicitly with m/latest

leonoel 2025-04-08T06:00:22.226719Z

same as any other flow : consume it in a switch, or with eduction + take-while