Fork me on GitHub
#missionary
<
2024-04-18
>
Eric Dvorsak11:04:29

Has anyone here came up with good sugar for (m/? (m/via m/blk ...)) ?

J13:04:50

Personally I do something like this:

(defn fetch-something [...])

(defn ?fetch-something [...]
  (m/via m/blk (fetch-something ...)))

// The caller is responsible to use `m/?`
(m/? (?fecth-something ...))
Otherwise you can create a macro, I think

Eric Dvorsak18:04:02

I like the ? prefix, do you use it to indicate a task?

Eric Dvorsak18:04:24

do you have a convention for a flow as well like <? ?

J19:04:35

Yes, the ? prefix indicate that the function return a task. For a flow, I use <.

Eric Dvorsak16:04:28

I was wondering how to do batches from a flow (turn a flow into a flow of n values) and it turned out to be as simple as

(m/?
 (m/reduce (constantly nil) nil
           (m/ap (let [x (m/?> (m/eduction (partition-all 2)
                                           (m/seed [1 2 3])))]
                   (println x)))))

Eric Dvorsak16:04:32

batch / batching / partition / partition-all / manifold.stream/bactch strean/batch s/batch (if someone is looking)

Eric Dvorsak16:04:08

The one thing I am missing is a timeout (take 2 but if the second value takes a while then go with just one)

Eric Dvorsak18:04:23

I used the wrong term it's not a timeout I'm talking about but a max latency, so if there is a partial batch result but the max-latency is passed since the previous batch, the partial batch is emitted

Hendrik19:04:28

you can use a modified version of the debounce operator (sorry on mobile can’t check)

(import '(missionary Cancelled))
(defn debounce [dur >in]
  (m/ap
    (let [batch (atom [])
x (m/?< >in)]
      (try (m/? (m/sleep dur x))
           (catch 
             (let [b (swap! batch conj x)
(if (= (count b) 42)
  (do (reset! batch []) (amb b))
             (m/amb))))))

leonoel09:04:27

@U03K8V573EC I'm struggling to find a purely functional solution to this problem. Could you share some inspiration from other languages/libraries ?

Eric Dvorsak09:04:45

manifold.stream/batch does that but I doubt it's purely functional

kawas09:04:40

The requirement is a little bit tricky. Does the timeout start at the first value of a partition or after the last seen value from the flow? If we do 3 elements partition and we had value1 2 seconds ago and value2 1 second ago and the timeout is 2 seconds, do we emit a pack or do we still have 1 second to wait after value2 ? Assuming the later case, I have implemented something with delayed values and custom transducers. Every value is emitted with a delayed twin then a custom transducer discard a delayed twin if a new value was in between but if a value and its twin are successive a separator mark is propagated. At last, a custom partition-all transducer can make pack of n values and force partitioning when separation mark is found.

leonoel09:04:44

I think this one is close

(defn batch [max-size max-delay input]
  (m/ap (let [[_ input] (m/?> (m/group-by {} input))]
          (m/? (m/reduce conj
                 (m/eduction (take-while (complement #{::timeout})) (take max-size)
                   (m/ap (m/amb= (m/?> input) (m/? (m/sleep max-delay ::timeout))))))))))

🙌 2
xificurC11:04:22

what is the purpose of m/group-by in the impl? Why "close", what's missing?

leonoel11:04:44

without group-by, the input flow would be cancelled/restarted between successive batches

💡 2
Hendrik12:04:10

Can you explain a little bit in more detail, why the group-by is needed? Why is would the flow restarted/cancelled without it?

leonoel10:04:17

This is a very tricky problem with deep implications for the design of ap I wrote an issue about this, please feedback https://github.com/leonoel/missionary/issues/109

👀 4
Andrew Wilcox08:05:35

When processing discrete flows we often have some internal state in the processor. In this gist I provide a mechanism where the internal state can implemented using loop/recur instead of needing to use imperative updates. As an example I implement the "batch with both a maximum batch size and a maximum latency" process. https://gist.github.com/awwx/18252e376d2480b893fb95ea2c98f56b

👍 3
leonoel19:05:36

"if or none" makes sense to me. I came across this pattern several times, the name I have in mind is where (and where-let,`where-some`) because • it's about filtering, like SQL where • it's the same kind of word as when, which has the same siblings with the same syntax

👍 2
Eric Dvorsak08:05:30

@U05P1LZ8EQH would be interesting to run some benchmarks for the different solutions (transducers vs loop recur vs ap with imperative state), I realize I've been using that pattern with transducers without thinking about alternatives, I wonder which one offers the best perfs, for dedupe for instance

Andrew Wilcox08:05:55

For loop/recur using my "transmuter" I wouldn't be surprised if it was a bit slower because it's adding an entire extra layer where values get bounced through an rdv, etc. But just speaking for myself I'm not doing anything CPU bound, so it doesn't really matter to me.

Andrew Wilcox08:05:35

When processing discrete flows we often have some internal state in the processor. In this gist I provide a mechanism where the internal state can implemented using loop/recur instead of needing to use imperative updates. As an example I implement the "batch with both a maximum batch size and a maximum latency" process. https://gist.github.com/awwx/18252e376d2480b893fb95ea2c98f56b

👍 3