Fork me on GitHub

In trying to understand continuous vs discrete flow, I wanted to sample a continuous flow at regular intervals. I thought the m/sample method would let me do that, but somewhere my understanding is halting because the following code never returns. Where am I wrong?

;; A task for the current time
  (def current-time (m/sp (System/currentTimeMillis)))
  ;; A continous flow of the current time (lazy sampling?)
  (def time-flow (m/ap (m/? (m/?> (m/seed (repeat current-time))))))

  (defn sleep-emit [delays]
    (m/ap (let [n (m/?> (m/seed delays))]
            (m/? (m/sleep n n)))))

  ;; A discrete flow
  (def sampler-flow (sleep-emit [1000 1001 1002]))

  ;; Trying to sample time-flow
  ;; WARN: Never returns 
  (m/? (m/reduce conj (m/sample vector


Why it doesn't work is a bit subtle and badly documented. When an operator needs the current value of a continuous flow, it checks if it's ready to transfer. If not, the previous value is reused, otherwise a fresh value is transferred. If the continuous flow becomes ready again immediately after the transfer, the transferred value is considered already stale so another value is transferred again and so on until the continuous flow is left in a non-ready state. The continuous flow you passed to sample produces values indefinitely as long as it's been asked for, therefore the sampling goes into an infinite loop. Consider this version. When this flow is being sampled, it returns the current time and waits for 1 millisecond before becoming ready again.

(def time-flow
  (m/latest (fn [_] (System/currentTimeMillis))
      (loop []
        (m/amb> nil
          (do (m/? (m/sleep 1))

🙏 1

That works, but it doesn’t help me understand. I think I get discrete flows, park until a value is available and then publish it. Like sleep-emit . For continuous flows I read that they can signal availability and then lazily sample but I don’t understand what functions in missionary corresponds to that. How can I create a continuous flow? How can I know when it has a value? How can I sample it?


(and thank you for trying to help me to understand!)


Is there anything that is different between a continuous flow and a discrete flow, or is it just about how you use them?


I’m sorry if these are stupid questions. I’ve read all that is available in the github repo and on the wiki but I seem to fail to get a good mental model of how the pieces work.


Continuous vs discrete is mostly about usage. They share the same protocol, continuous flows just have the additional constraint to be initially defined. It's always OK to pass a continuous flow to an operator expecting a discrete flow, you will get the successive states sampled at a rate defined by the backpressure strategy of the operator. The opposite is generally true and will work most of the time, but doesn't always make sense.

🙏 1

That makes sense to me. Thank you! To see if my understanding of that translates to code, would this be an example of a flow that is not initially defined?

(m/? (m/reduce conj (m/ap (m/? (m/sleep 1000 :hello)))))
and would this be a valid way to make it initially defined:
(m/? (m/reduce conj (m/ap (m/amb> nil (m/? (m/sleep 1000 :hello))))))



✔️ 1