Hi, suppose for the sake of an example I have an epocher> flow that outputs unix epoch timestamps in ms every sec. How can I transform this flow to show the difference between two consecutive timestamps? thanks
(def epocher> (m/ap
(loop []
(m/amb (.getTime (js/Date.))
(do (m/? (m/sleep 1000))
(recur))))))
((m/reduce #(println :v %2) nil (->> epocher>
(m/eduction (take 3))))
#(println :done %) #(println :failed %))
;; :v 1716654854791
;; :v 1716654855801
;; :v 1716654856801
;; :done nil
(def epocher-delta> ??uses epocher> from above??)
((m/reduce #(println :v %2) nil (->> epocher-delta>
(m/eduction (take 2))))
#(println :done %) #(println :failed %))
;; :v 1010
;; :v 1000
;; :done nilSounds like m/reductions would also work. If you don't want the first value you can add a (m/eduction (drop 1))
Hi @xifi. What exactly do you have in mind with m/reductions? The naive approach won't work because we are subtracted the previous calculated value rather than previous item, for example
(def epocher> (m/ap
(loop [cnt 1]
(m/amb cnt
(recur (* cnt 2))))))
((m/reduce #(println :v %2) nil (->> epocher>
(m/eduction (take 4))))
#(println :done %) #(println :failed %))
;; :v 1
;; :v 2
;; :v 4
;; :v 8
;; :done nil
(def epocher-delta> (->> (m/reductions (fn [prv nxt]
(- nxt prv)) 0 epocher>)
(m/eduction (drop 1))))
((m/reduce #(println :y %2) nil (->> epocher-delta>
(m/eduction (take 3))))
#(println :done %) #(println :failed %))
;; :y 1
;; :y 1
;; :y 3
;; :done nilYou can return [t diff] and eduction map second
Right, this works too as expected. I find this solution a bit verbose compared to xforms, though I assume both are equally efficient due to use of transformers. Initially, I hoped for a straightforward solution to retrieve n values from a flow in m/ap while forking, without needing to track state in an atom or volatile.This discussion made me realize I need to think about flow transformations in terms of transformers. thanks
(def epocher> (m/ap
(loop [cnt 1]
(m/amb cnt
(recur (* cnt 2))))))
((m/reduce #(println :v %2) nil (->> epocher>
(m/eduction (take 4))))
#(println :done %) #(println :failed %))
;; :v 1
;; :v 2
;; :v 4
;; :v 8
;; :done nil
(def epocher-delta> (->> (m/reductions (fn [[prv _ :as x] nxt]
[nxt (- nxt prv)]) [0 nil] epocher>)
(m/eduction (map second) (drop 2))))
((m/reduce #(println :y %2) nil (->> epocher-delta>
(m/eduction (take 3))))
#(println :done %) #(println :failed %))
;; :y 1
;; :y 2
;; :y 4
;; :done nil
Naively, something with eduction and transducers. partition-all (from first naive solution) may not be enough to deal with overlapping pairs, but https://github.com/cgrand/xforms can help with that 😀
(m/eduction (x/partition 2 1)
(map #(- (nth % 1) (nth % 0)))
epocher>)
Edit: Rewrote the solution with xforms insteadRight, I'm looking for consecutive diffs, the second minus the first, the third minus the second etc, something like (partition 2 1) as you nicely pointed in the updated solution. Just for the record with regards to the original solution, the partition-all will return the diffs of pairs the second minus the first, the fourth minus the third etc.
(def epocher> (m/ap
(loop [cnt 1]
(m/amb cnt
(recur (* cnt 2))))))
((m/reduce #(println :v %2) nil (->> epocher>
(m/eduction (take 4))))
#(println :done %) #(println :failed %))
;; :v 1
;; :v 2
;; :v 4
;; :v 8
;; :done nil
(def epocher-delta> (m/eduction (partition-all 2)
(map #(when (= 2 (count %))
(- (nth % 1) (nth % 0))))
epocher>))
((m/reduce #(println :y %2) nil (->> epocher-delta>
(m/eduction (take 2))))
#(println :done %) #(println :failed %))
;; :y 1
;; :y 4
;; :done nil
let me know try next with the updated solution (I wasn't aware of the xforms lib)Yes the first solution with partition-all was too naive and did not take into account the overlapping pair. Thus the use of https://cljdoc.org/d/net.cgrand/xforms/0.19.6/api/net.cgrand.xforms which is THE lib for more transducers and reducing functions 😀
and it does indeed work as expected. I need to read through xforms which seams to be the right tool for performing advanced flow transformations. Thanks for bringing it to my attention.
By the way, could there also be a potential solution using m/ap? I was struggling to come up with one that would keep track of the last item retrieved from the epocher> flow while forking to the next item
(def epocher> (m/ap
(loop [cnt 1]
(m/amb cnt
(recur (* cnt 2))))))
((m/reduce #(println :v %2) nil (->> epocher>
(m/eduction (take 4))))
#(println :done %) #(println :failed %))
;; :v 1
;; :v 2
;; :v 4
;; :v 8
;; :done nil
(def epocher-delta> (m/eduction (x/partition 2 1)
(map #(- (nth % 1) (nth % 0)))
epocher>))
((m/reduce #(println :y %2) nil (->> epocher-delta>
(m/eduction (take 3))))
#(println :done %) #(println :failed %))
;; :y 1
;; :y 2
;; :y 4
;; :done nilWith vanilla m/ap you must keep track of the previous consumed value in some state
(defn epocher-delta> [flow]
(let [!prev (atom ::empty)]
(m/ap (let [v (m/?> flow)
prev @!prev]
(reset! !prev v)
(if (= ::empty prev)
(m/amb)
(- v prev))))))Right, thanks!