Fork me on GitHub
#missionary
<
2024-05-02
>
Andrew Wilcox04:05:33

Suppose we want to add a delay to a flow. We want each item from the input flow to be produced in the output flow [edit: in the same order] after a specific delay from when that item was produced by the input flow. Here's a simple but buggy version:

(defn add-delay [ms f]
  (m/ap
   (let [v (m/?> ##Inf f)]
     (m/? (m/sleep ms))
     v)))
The problem is that if values are produced in the input flow synchronously (produced "at the same time", so to speak), this version of add-delay will produce them out of order. A different strategy is to add a timestamp to the incoming values, and then when we're producing the values we can sleep for the remaining time before the value should be produced. For this we want a monotonic clock (a clock that doesn't change if the system date/time is modified). Java provides a monotonic clock with System.nanoTime() and JavaScript with performance.now(). The Java clock value is a long in nanoseconds, and to avoid numerical overflow the only operation we want to do with the value is to subtract two clock values to get an elapsed time. See https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/lang/System.html#nanoTime(). So, in our code, we want to treat it as an opaque value aside from calculating a time difference. They're in different units anyway, since the JavaScript clock value is in milliseconds.
(defn monotonic-now []
  #?(:clj (System/nanoTime)
     :cljs (js/performance.now)))

;; Return the elapsed time between `t1` and `t2` in milliseconds.

(defn time-diff [t1 t2]
  #?(:clj (/ (- t2 t1) 1000000.)
     :cljs (- t2 t1)))

(defn timestamp [>f]
  (m/ap
   (let [v (m/?> >f)]
     [(monotonic-now) v])))
With the timestamp of when the value was produced by the input flow, here's an imaginary implementation of mdelay (`m/buffer` doesn't actually take ##Inf)... but of course an infinite buffer would be easy to implement, such as with an m/mbx.
(defn mdelay [ms >f]
  (m/ap
   (let [[ts v] (m/?> (m/buffer ##Inf (timestamp >f)))]
     (m/? (m/sleep (- ms (time-diff ts (monotonic-now))) v)))))
(If you want to try this code simply replace the ##Inf with a large integer such as 1000000, larger than the number of values you'd expect to need to buffer). The question of backpressure is an interesting one. We have to have a buffer because during the delay period, the input flow could produce as many values as it wanted before we found out after the delay whether the consumer of the output flow was taking any values or not. But, on the other hand, once the consumer was applying backpressure and not taking values, we wouldn't have to accept more values from the input flow (if we didn't want to) until the buffer had drained. On the third hand, maybe it doesn't actually make sense to apply backpressure to the input flow when the consumer is currently slow, since we don't know if the consumer will be slow after the delay period. So far at least, the places where I plug in a delay is for things like timeouts, where backpressure is already handled by the primary stream.

Eric Dvorsak08:05:08

if I understand the idea correctly you could even check the timediff and emit the value immediatly without sleep if the timediff is 0?

Andrew Wilcox08:05:57

Yes, 0 or negative. (With a slow consumer we might be past the delay time when the consumer is ready to take the value. Thus the delay time is really an "no earlier than" time.)

Eric Dvorsak08:05:19

the first add-delay is unordered but not really buggy, for instance it would allow you to have arbitrary delays for each value

Eric Dvorsak08:05:42

in which case it makes sense that the value don't necessarily come out in the same order

Andrew Wilcox08:05:52

Buggy with respect to a specification that the output flow would come out in the same order as the input flow, just delayed; but, like you say, not buggy with respect to a specification that doesn't specify that it needs to be ordered. Since I didn't actually specify that it should be ordered, I could say the bug was in my specification 😉

Andrew Wilcox05:05:02

Oh, ClojureScript has system-time, which uses performance.now when available. https://cljs.github.io/api/cljs.core/system-time

Eric Dvorsak12:05:15

Has anyone thought about what a missionary based equivalent of component/mount/integrant would be like?

Andrew Wilcox23:05:05

What is an integrant?

Andrew Wilcox06:05:50

Have you tried Electric Clojure? https://electric.hyperfiddle.net/ https://hyperfiddle.github.io/#/page/Contents It streams reactive state between client and server, with mount/unmount of components. It's based on Missionary.

Eric Dvorsak07:05:17

@U05P1LZ8EQH yes I've toyed with it, it has a lifecycle concept but I was thinking about something more focused, electric primary usecase is to make a cross-network reactive application

Andrew Wilcox07:05:33

Well, in Missionary, you can use try...finally with ap, cp, or sp:

(m/ap
  (try
    (println "mount")
    ...
    (finally
      (println "unmount"))))
For state, you might consider using a continuous flow.

Piotr Roterski14:05:36

@U03K8V573EC I'm not convinced runtime-state-management-system (however you might call it) needs to be tied to missionary and/or electric. From backend system point of view, electric is just a middleware on http-server/router process that you can manage with whatever works best for you, so probably either one of mount/integrant/component could work. I like https://github.com/donut-party/system the most and it plays quite well with Electric applications in my experience.