missionary

Hendrik 2024-03-27T19:24:58.794559Z

I want to delay a flow. What I want to achieve is roughly this: I want to wait for 1 second before the first element is taken from upstream and immediately send to downstream. Then I want to wait for another second before the next value is taken from upstream and send to downstream and so on. How can I achieve this?

pez 2024-03-27T20:35:56.304139Z

Is this what throttle in Electric is doing, maybe?

(cc/defn throttle [dur >in]
  (m/ap
    (let [x (m/?> (m/relieve {} >in))]
      (m/amb x (do (m/? (m/sleep dur)) (m/amb))))))

Hendrik 2024-03-27T21:01:23.427089Z

Can’t test it right now (writing from phone). I think that this comes close, but it does not wait before first value is transferred to downstream.

Hendrik 2024-03-27T21:03:18.502819Z

I need this : wait -> transfer -> wait -> transfer

Dustin Getz (Hyperfiddle) 2024-03-27T22:04:07.261919Z

i believe m/ap + loop/recur + m/amb + m/sleep

Dustin Getz (Hyperfiddle) 2024-03-27T22:04:50.925379Z

m/ap is sequential so you can just sleep first

2024-03-29T11:19:21.813909Z

Hi, as I was playing with op's idea of waiting before consuming. It appears in my test that, if you sleep before consuming for the first time, your input flow will not be started/flowing. You have, at least, to fork it once to make it flows! So maybe op's idea just does not make sense when talking about flow consumption 🤔 Are my test and my conclusion right?

Hendrik 2024-03-29T13:50:41.085589Z

I guess, that you are right.

leonoel 2024-03-29T14:16:34.689259Z

sleep before fork will just delay the initialization of input flow, IMO it matches the problem statement

Hendrik 2024-03-28T07:19:19.417699Z

Thanks for your help. I found a solution.

(defn delayed [delay >input]
  (m/ap
   (m/? (m/sleep delay))
   (let [x (m/?> >input)]
     (m/amb
      x
      (let [_ (m/? (m/sleep delay))]
        (m/amb))))))

Hendrik 2024-03-28T08:31:46.066779Z

This fails, if used in electric. I posted a new question in hypperfiddle channel.

xificurC 2024-03-28T08:36:37.508669Z

in electric all flows need to be initialized, i.e. they need to return a value right away. Typical initial values are nil or Pending

xificurC 2024-03-28T08:37:32.249469Z

you can wrap the ap code with (m/amb= nil/pending ..)

Hendrik 2024-03-28T08:46:16.484979Z

I know, that the flow must be initialized for electric. Therefore I use it in electric like this

(->> flow
  (m/relieve {})
  (delayed 1000)
  (m/reductions {} nil)) ; does not work fails
wouldn’t that initialize the flow from electric’s point of view?

xificurC 2024-03-28T08:51:18.740319Z

is that literally the code you use? m/reductions

Hendrik 2024-03-28T08:51:58.984029Z

sorry type. I use m/reductions

Hendrik 2024-03-28T08:53:04.899979Z

I also tried:

(defn delayed [delay >input]
  (m/ap
   (m/amb=
    nil
    (m/?> (m/ap
           (m/? (m/sleep delay))
           (let [x (m/?> >input)]
             (m/amb
              x
              (let [_ (m/? (m/sleep delay))]
                (m/amb)))))))))
This works in missionary (m/? (m/reduce ….)), but fails in electric with the same error.

Hendrik 2024-03-28T08:55:53.016029Z

I do not think, that this error is missionary related, but specific to electric. I posted the question and error stacktrace there: https://clojurians.slack.com/archives/C7Q9GSHFV/p1711615083448719