This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2024-04-10
Channels
- # babashka (8)
- # beginners (38)
- # calva (28)
- # cider (7)
- # clj-kondo (6)
- # clojure (113)
- # clojure-austin (2)
- # clojure-berlin (11)
- # clojure-europe (12)
- # clojure-losangeles (5)
- # clojure-madison (1)
- # clojure-nl (1)
- # clojure-norway (17)
- # clojure-spec (5)
- # clojure-uk (4)
- # clojurescript (22)
- # events (2)
- # fulcro (6)
- # hyperfiddle (2)
- # missionary (5)
- # music (1)
- # off-topic (20)
- # portal (5)
- # reitit (4)
- # releases (1)
- # scittle (3)
- # sql (10)
- # squint (2)
I was wondering if there might be a simpler or more elegant way of doing this... every ms
milliseconds, batch
produces a vector of all of the values that the input flow produced during the time interval, if the input flow produced any values during that time. For example, if the interval is set to one minute and the input flow produces three items in the first minute, after one minute batch
will produce a vector of those three items. After two minutes, batch
will produce a vector of any items that came in during the second minute, if any items were produced; and so on. If the input flow terminates, batch
immediately produces a vector of the remaining items (if there are any remaining items) without waiting for the end of the time interval, and terminates.
I struggled a bit to handle termination. I ended up using a dfv to signal when the input flow had terminated:
(defn batch [ms >f]
(let [terminated (m/dfv)
!accum (atom [])]
(letfn [(add-item [x] (swap! !accum conj x))
(take-items [] (first (swap-vals! !accum (constantly []))))]
(m/ap
(m/amb=
;; Add items from the input flow to `!accum` as they come
;; in; assign `terminated` when the input flow terminates.
(m/amb
(do (add-item (m/?> >f))
(m/amb))
(do (terminated nil)
(m/amb)))
;; Process until `terminated` is assigned.
(if (m/?< (m/ap (m/amb true (do (m/? terminated) false))))
;; Not yet terminated; produce items every `ms`
;; milliseconds. Will be canceled if terminated is assigned
;; during the sleep.
(try
(loop []
(m/? (m/sleep ms))
(let [items (take-items)]
(if (= items [])
(recur)
(m/amb items (recur)))))
(catch Cancelled _ (m/amb)))
;; The input flow has terminated; produce the final batch of items.
(let [items (take-items)]
(if (= items [])
(m/amb)
(m/amb items)))))))))
You could do early termination with eduction
and reductions
. I managed to implement the requirement with a transducer:
;;;
(defn interval [ms v]
(let [now #(System/currentTimeMillis)]
(m/ap (m/? (m/sleep (- (m/?> (m/seed (next (iterate (partial + ms) (now)))))
(now)) v)))))
(defn batch [ms >f]
(let [>g (m/ap (m/amb (m/?> >f) ::end))
>s (interval ms ::sep)]
(->> (m/ap (m/?> (m/?> 2 (m/seed [>g >s]))))
(m/eduction
(fn [rf]
(let [!acc (atom [])]
(completing
(fn [r i]
(case i
::sep (rf r (first (reset-vals! !acc [])))
::end (reduced (rf r (first (reset-vals! !acc []))))
(swap! !acc conj i))))))))))
Same solution as @U027DJ7CPPU but using more transducers with eduction
and few helper functions to make it a little bit more readable 😉
m/?=
is deprecated in favor or m/?> ##Inf
or there's the sugar (m/amb= (m/?> flow1) (m/?> flow2)
for mixing a known number of flows