Has anyone here came up with good sugar for (m/? (m/via m/blk ...)) ?
Personally I do something like this:
(defn fetch-something [...])
(defn ?fetch-something [...]
(m/via m/blk (fetch-something ...)))
// The caller is responsible to use `m/?`
(m/? (?fecth-something ...))
Otherwise you can create a macro, I thinkI like the ? prefix, do you use it to indicate a task?
do you have a convention for a flow as well like <? ?
Yes, the ? prefix indicate that the function return a task. For a flow, I use <.
I was wondering how to do batches from a flow (turn a flow into a flow of n values) and it turned out to be as simple as
(m/?
(m/reduce (constantly nil) nil
(m/ap (let [x (m/?> (m/eduction (partition-all 2)
(m/seed [1 2 3])))]
(println x)))))@andrew818 would be interesting to run some benchmarks for the different solutions (transducers vs loop recur vs ap with imperative state), I realize I've been using that pattern with transducers without thinking about alternatives, I wonder which one offers the best perfs, for dedupe for instance
For loop/recur using my "transmuter" I wouldn't be surprised if it was a bit slower because it's adding an entire extra layer where values get bounced through an rdv, etc. But just speaking for myself I'm not doing anything CPU bound, so it doesn't really matter to me.
This is a very tricky problem with deep implications for the design of ap
I wrote an issue about this, please feedback https://github.com/leonoel/missionary/issues/109
batch / batching / partition / partition-all / manifold.stream/bactch strean/batch s/batch (if someone is looking)
The one thing I am missing is a timeout (take 2 but if the second value takes a while then go with just one)
saw that one day, but didn't try https://clojurians.slack.com/archives/CL85MBPEF/p1707239083211269
I used the wrong term it's not a timeout I'm talking about but a max latency, so if there is a partial batch result but the max-latency is passed since the previous batch, the partial batch is emitted
you can use a modified version of the debounce operator (sorry on mobile canβt check)
(import '(missionary Cancelled))
(defn debounce [dur >in]
(m/ap
(let [batch (atom [])
x (m/?< >in)]
(try (m/? (m/sleep dur x))
(catch
(let [b (swap! batch conj x)
(if (= (count b) 42)
(do (reset! batch []) (amb b))
(m/amb))))))When processing discrete flows we often have some internal state in the processor. In this gist I provide a mechanism where the internal state can implemented using loop/recur instead of needing to use imperative updates. As an example I implement the "batch with both a maximum batch size and a maximum latency" process. https://gist.github.com/awwx/18252e376d2480b893fb95ea2c98f56b
"if or none" makes sense to me. I came across this pattern several times, the name I have in mind is where (and where-let,`where-some`) because
β’ it's about filtering, like SQL where
β’ it's the same kind of word as when, which has the same siblings with the same syntax
I like where, ifn makes me think of https://clojure.github.io/clojure/javadoc/clojure/lang/IFn.html
@yenda1 I'm struggling to find a purely functional solution to this problem. Could you share some inspiration from other languages/libraries ?
manifold.stream/batch does that but I doubt it's purely functional
The requirement is a little bit tricky. Does the timeout start at the first value of a partition or after the last seen value from the flow? If we do 3 elements partition and we had value1 2 seconds ago and value2 1 second ago and the timeout is 2 seconds, do we emit a pack or do we still have 1 second to wait after value2 ? Assuming the later case, I have implemented something with delayed values and custom transducers. Every value is emitted with a delayed twin then a custom transducer discard a delayed twin if a new value was in between but if a value and its twin are successive a separator mark is propagated. At last, a custom partition-all transducer can make pack of n values and force partitioning when separation mark is found.
I think this one is close
(defn batch [max-size max-delay input]
(m/ap (let [[_ input] (m/?> (m/group-by {} input))]
(m/? (m/reduce conj
(m/eduction (take-while (complement #{::timeout})) (take max-size)
(m/ap (m/amb= (m/?> input) (m/? (m/sleep max-delay ::timeout))))))))))what is the purpose of m/group-by in the impl? Why "close", what's missing?
without group-by, the input flow would be cancelled/restarted between successive batches
Can you explain a little bit in more detail, why the group-by is needed? Why is would the flow restarted/cancelled without it?
I have never used group-by but it has interesting properties indeed:
> Cancelling a group consumer makes it fail immediately. If a value is subsequently found for the same grouping, the key-group pair is produced again
(m/eduction (take 3) flow) will cancel flow after taking in 3 values. The group-by will spawn a new flow that can be consumed by eduction again