Fork me on GitHub
#missionary
<
2024-03-27
>
Hendrik19:03:58

I want to delay a flow. What I want to achieve is roughly this: I want to wait for 1 second before the first element is taken from upstream and immediately send to downstream. Then I want to wait for another second before the next value is taken from upstream and send to downstream and so on. How can I achieve this?

pez20:03:56

Is this what throttle in Electric is doing, maybe?

(cc/defn throttle [dur >in]
  (m/ap
    (let [x (m/?> (m/relieve {} >in))]
      (m/amb x (do (m/? (m/sleep dur)) (m/amb))))))

Hendrik21:03:23

Can’t test it right now (writing from phone). I think that this comes close, but it does not wait before first value is transferred to downstream.

Hendrik21:03:18

I need this : wait -> transfer -> wait -> transfer

Dustin Getz22:03:07

i believe m/ap + loop/recur + m/amb + m/sleep

Dustin Getz22:03:50

m/ap is sequential so you can just sleep first

Hendrik07:03:19

Thanks for your help. I found a solution.

(defn delayed [delay >input]
  (m/ap
   (m/? (m/sleep delay))
   (let [x (m/?> >input)]
     (m/amb
      x
      (let [_ (m/? (m/sleep delay))]
        (m/amb))))))

Hendrik08:03:46

This fails, if used in electric. I posted a new question in hypperfiddle channel.

xificurC08:03:37

in electric all flows need to be initialized, i.e. they need to return a value right away. Typical initial values are nil or Pending

xificurC08:03:32

you can wrap the ap code with (m/amb= nil/pending ..)

Hendrik08:03:16

I know, that the flow must be initialized for electric. Therefore I use it in electric like this

(->> flow
  (m/relieve {})
  (delayed 1000)
  (m/reductions {} nil)) ; does not work fails
wouldn’t that initialize the flow from electric’s point of view?

xificurC08:03:18

is that literally the code you use? m/reductions

Hendrik08:03:58

sorry type. I use m/reductions

Hendrik08:03:04

I also tried:

(defn delayed [delay >input]
  (m/ap
   (m/amb=
    nil
    (m/?> (m/ap
           (m/? (m/sleep delay))
           (let [x (m/?> >input)]
             (m/amb
              x
              (let [_ (m/? (m/sleep delay))]
                (m/amb)))))))))
This works in missionary (m/? (m/reduce ….)), but fails in electric with the same error.

Hendrik08:03:53

I do not think, that this error is missionary related, but specific to electric. I posted the question and error stacktrace there: https://clojurians.slack.com/archives/C7Q9GSHFV/p1711615083448719

kawas11:03:21

Hi, as I was playing with op's idea of waiting before consuming. It appears in my test that, if you sleep before consuming for the first time, your input flow will not be started/flowing. You have, at least, to fork it once to make it flows! So maybe op's idea just does not make sense when talking about flow consumption :thinking_face: Are my test and my conclusion right?

Hendrik13:03:41

I guess, that you are right.

leonoel14:03:34

sleep before fork will just delay the initialization of input flow, IMO it matches the problem statement