missionary

chaos 2024-05-25T16:47:09.095429Z

Hi, suppose for the sake of an example I have an epocher> flow that outputs unix epoch timestamps in ms every sec. How can I transform this flow to show the difference between two consecutive timestamps? thanks

(def epocher> (m/ap
               (loop []
                 (m/amb (.getTime (js/Date.))
                        (do (m/? (m/sleep 1000))
                            (recur))))))

((m/reduce #(println :v %2) nil (->> epocher>
                                     (m/eduction (take 3))))
 #(println :done %) #(println :failed %))
;; :v 1716654854791
;; :v 1716654855801
;; :v 1716654856801
;; :done nil

(def epocher-delta> ??uses epocher> from above??)

((m/reduce #(println :v %2) nil (->> epocher-delta>
                                     (m/eduction (take 2))))
 #(println :done %) #(println :failed %))
;; :v 1010
;; :v 1000
;; :done nil

xificurC 2024-05-26T08:50:11.914999Z

Sounds like m/reductions would also work. If you don't want the first value you can add a (m/eduction (drop 1))

chaos 2024-05-26T10:04:58.883579Z

Hi @xifi. What exactly do you have in mind with m/reductions? The naive approach won't work because we are subtracted the previous calculated value rather than previous item, for example

(def epocher> (m/ap
                (loop [cnt 1]
                  (m/amb cnt
                         (recur (* cnt 2))))))

((m/reduce #(println :v %2) nil (->> epocher>
                                     (m/eduction (take 4))))
 #(println :done %) #(println :failed %))
;; :v 1
;; :v 2
;; :v 4
;; :v 8
;; :done nil

(def epocher-delta> (->> (m/reductions (fn [prv nxt]
                                         (- nxt prv)) 0 epocher>)
                         (m/eduction (drop 1))))

((m/reduce #(println :y %2) nil (->> epocher-delta>
                                     (m/eduction (take 3))))
 #(println :done %) #(println :failed %))
;; :y 1
;; :y 1
;; :y 3
;; :done nil

xificurC 2024-05-26T11:56:31.164469Z

You can return [t diff] and eduction map second

chaos 2024-05-26T20:59:26.553779Z

Right, this works too as expected. I find this solution a bit verbose compared to xforms, though I assume both are equally efficient due to use of transformers. Initially, I hoped for a straightforward solution to retrieve n values from a flow in m/ap while forking, without needing to track state in an atom or volatile.This discussion made me realize I need to think about flow transformations in terms of transformers. thanks

(def epocher> (m/ap
                (loop [cnt 1]
                  (m/amb cnt
                         (recur (* cnt 2))))))

((m/reduce #(println :v %2) nil (->> epocher>
                                     (m/eduction (take 4))))
 #(println :done %) #(println :failed %))
;; :v 1
;; :v 2
;; :v 4
;; :v 8
;; :done nil

(def epocher-delta> (->> (m/reductions (fn [[prv _ :as x] nxt]
                                         [nxt (- nxt prv)]) [0 nil] epocher>)
                         
                         (m/eduction (map second) (drop 2))))

((m/reduce #(println :y %2) nil (->> epocher-delta>
                                     (m/eduction (take 3))))
 #(println :done %) #(println :failed %))
;; :y 1
;; :y 2
;; :y 4
;; :done nil

2024-05-25T20:35:57.978539Z

Naively, something with eduction and transducers. partition-all (from first naive solution) may not be enough to deal with overlapping pairs, but https://github.com/cgrand/xforms can help with that 😀

(m/eduction (x/partition 2 1)
            (map #(- (nth % 1) (nth % 0)))
            epocher>)
Edit: Rewrote the solution with xforms instead

chaos 2024-05-25T20:56:24.454389Z

Right, I'm looking for consecutive diffs, the second minus the first, the third minus the second etc, something like (partition 2 1) as you nicely pointed in the updated solution. Just for the record with regards to the original solution, the partition-all will return the diffs of pairs the second minus the first, the fourth minus the third etc.

(def epocher> (m/ap
                (loop [cnt 1]
                  (m/amb cnt
                         (recur (* cnt 2))))))

((m/reduce #(println :v %2) nil (->> epocher>
                                     (m/eduction (take 4))))
 #(println :done %) #(println :failed %))
;; :v 1
;; :v 2
;; :v 4
;; :v 8
;; :done nil

(def epocher-delta> (m/eduction (partition-all 2)
                                (map #(when (= 2 (count %))
                                        (- (nth % 1) (nth % 0))))
                                epocher>))

((m/reduce #(println :y %2) nil (->> epocher-delta>
                                     (m/eduction (take 2))))
 #(println :done %) #(println :failed %))
;; :y 1
;; :y 4
;; :done nil
let me know try next with the updated solution (I wasn't aware of the xforms lib)

2024-05-25T20:59:04.878889Z

Yes the first solution with partition-all was too naive and did not take into account the overlapping pair. Thus the use of https://cljdoc.org/d/net.cgrand/xforms/0.19.6/api/net.cgrand.xforms which is THE lib for more transducers and reducing functions 😀

chaos 2024-05-25T21:09:11.031189Z

and it does indeed work as expected. I need to read through xforms which seams to be the right tool for performing advanced flow transformations. Thanks for bringing it to my attention. By the way, could there also be a potential solution using m/ap? I was struggling to come up with one that would keep track of the last item retrieved from the epocher> flow while forking to the next item

(def epocher> (m/ap
                (loop [cnt 1]
                  (m/amb cnt
                         (recur (* cnt 2))))))

((m/reduce #(println :v %2) nil (->> epocher>
                                     (m/eduction (take 4))))
 #(println :done %) #(println :failed %))
;; :v 1
;; :v 2
;; :v 4
;; :v 8
;; :done nil

(def epocher-delta> (m/eduction (x/partition 2 1)
                                (map #(- (nth % 1) (nth % 0)))
                                epocher>))

((m/reduce #(println :y %2) nil (->> epocher-delta>
                                     (m/eduction (take 3))))
 #(println :done %) #(println :failed %))
;; :y 1
;; :y 2
;; :y 4
;; :done nil

2024-05-25T21:33:16.430069Z

With vanilla m/ap you must keep track of the previous consumed value in some state

(defn epocher-delta> [flow]
  (let [!prev (atom ::empty)]
    (m/ap (let [v (m/?> flow)
                prev @!prev]
            (reset! !prev v)
            (if (= ::empty prev)
              (m/amb)
              (- v prev))))))

chaos 2024-05-25T21:35:50.797579Z

Right, thanks!