Fork me on GitHub
#missionary
<
2024-04-10
>
Andrew Wilcox04:04:03

I was wondering if there might be a simpler or more elegant way of doing this... every ms milliseconds, batch produces a vector of all of the values that the input flow produced during the time interval, if the input flow produced any values during that time. For example, if the interval is set to one minute and the input flow produces three items in the first minute, after one minute batch will produce a vector of those three items. After two minutes, batch will produce a vector of any items that came in during the second minute, if any items were produced; and so on. If the input flow terminates, batch immediately produces a vector of the remaining items (if there are any remaining items) without waiting for the end of the time interval, and terminates. I struggled a bit to handle termination. I ended up using a dfv to signal when the input flow had terminated:

(defn batch [ms >f]
  (let [terminated (m/dfv)
        !accum (atom [])]
    (letfn [(add-item [x] (swap! !accum conj x))
            (take-items [] (first (swap-vals! !accum (constantly []))))]
      (m/ap
       (m/amb=

        ;; Add items from the input flow to `!accum` as they come
        ;; in; assign `terminated` when the input flow terminates.
        (m/amb
         (do (add-item (m/?> >f))
             (m/amb))
         (do (terminated nil)
             (m/amb)))

        ;; Process until `terminated` is assigned.
        (if (m/?< (m/ap (m/amb true (do (m/? terminated) false))))
          ;; Not yet terminated; produce items every `ms`
          ;; milliseconds.  Will be canceled if terminated is assigned
          ;; during the sleep.
          (try
            (loop []
              (m/? (m/sleep ms))
              (let [items (take-items)]
                (if (= items [])
                  (recur)
                  (m/amb items (recur)))))
            (catch Cancelled _ (m/amb)))

          ;; The input flow has terminated; produce the final batch of items.
          (let [items (take-items)]
            (if (= items [])
              (m/amb)
              (m/amb items)))))))))

lotuc13:04:51

You could do early termination with eduction and reductions . I managed to implement the requirement with a transducer:

;;; 
(defn interval [ms v]
  (let [now #(System/currentTimeMillis)]
    (m/ap (m/? (m/sleep (- (m/?> (m/seed (next (iterate (partial + ms) (now)))))
                           (now)) v)))))

(defn batch [ms >f]
  (let [>g (m/ap (m/amb (m/?> >f) ::end))
        >s (interval ms ::sep)]
    (->> (m/ap (m/?> (m/?> 2 (m/seed [>g >s]))))
         (m/eduction
          (fn [rf]
            (let [!acc (atom [])]
              (completing
               (fn [r i]
                 (case i
                   ::sep (rf r (first (reset-vals! !acc [])))
                   ::end (reduced (rf r (first (reset-vals! !acc []))))
                   (swap! !acc conj i))))))))))

🙏 1
👍 2
kawas22:04:05

Same solution as @U027DJ7CPPU but using more transducers with eduction and few helper functions to make it a little bit more readable 😉

🙌 1
🙏 1
💛 1
Eric Dvorsak16:04:29

m/?= is deprecated in favor or m/?> ##Inf

Eric Dvorsak16:04:14

or there's the sugar (m/amb= (m/?> flow1) (m/?> flow2) for mixing a known number of flows