missionary

Eric Dvorsak 2024-04-18T11:19:29.623679Z

Has anyone here came up with good sugar for (m/? (m/via m/blk ...)) ?

J 2024-04-18T13:38:50.053549Z

Personally I do something like this:

(defn fetch-something [...])

(defn ?fetch-something [...]
  (m/via m/blk (fetch-something ...)))

// The caller is responsible to use `m/?`
(m/? (?fecth-something ...))
Otherwise you can create a macro, I think

Eric Dvorsak 2024-04-18T18:22:02.811879Z

I like the ? prefix, do you use it to indicate a task?

Eric Dvorsak 2024-04-18T18:22:24.473669Z

do you have a convention for a flow as well like <? ?

J 2024-04-18T19:17:35.970309Z

Yes, the ? prefix indicate that the function return a task. For a flow, I use <.

Eric Dvorsak 2024-04-18T16:22:28.286889Z

I was wondering how to do batches from a flow (turn a flow into a flow of n values) and it turned out to be as simple as

(m/?
 (m/reduce (constantly nil) nil
           (m/ap (let [x (m/?> (m/eduction (partition-all 2)
                                           (m/seed [1 2 3])))]
                   (println x)))))

Eric Dvorsak 2024-05-02T08:20:30.574169Z

@andrew818 would be interesting to run some benchmarks for the different solutions (transducers vs loop recur vs ap with imperative state), I realize I've been using that pattern with transducers without thinking about alternatives, I wonder which one offers the best perfs, for dedupe for instance

Andrew Wilcox 2024-05-02T08:24:55.893199Z

For loop/recur using my "transmuter" I wouldn't be surprised if it was a bit slower because it's adding an entire extra layer where values get bounced through an rdv, etc. But just speaking for myself I'm not doing anything CPU bound, so it doesn't really matter to me.

leonoel 2024-04-21T10:30:17.422959Z

This is a very tricky problem with deep implications for the design of ap I wrote an issue about this, please feedback https://github.com/leonoel/missionary/issues/109

πŸ‘€ 2
Eric Dvorsak 2024-04-18T16:23:32.057329Z

batch / batching / partition / partition-all / manifold.stream/bactch strean/batch s/batch (if someone is looking)

Eric Dvorsak 2024-04-18T16:24:08.646139Z

The one thing I am missing is a timeout (take 2 but if the second value takes a while then go with just one)

2024-04-18T18:31:03.262679Z

saw that one day, but didn't try https://clojurians.slack.com/archives/CL85MBPEF/p1707239083211269

Eric Dvorsak 2024-04-18T18:34:23.067439Z

I used the wrong term it's not a timeout I'm talking about but a max latency, so if there is a partial batch result but the max-latency is passed since the previous batch, the partial batch is emitted

Hendrik 2024-04-18T19:49:28.348739Z

you can use a modified version of the debounce operator (sorry on mobile can’t check)

(import '(missionary Cancelled))
(defn debounce [dur >in]
  (m/ap
    (let [batch (atom [])
x (m/?< >in)]
      (try (m/? (m/sleep dur x))
           (catch 
             (let [b (swap! batch conj x)
(if (= (count b) 42)
  (do (reset! batch []) (amb b))
             (m/amb))))))

Andrew Wilcox 2024-05-01T08:23:35.488229Z

When processing discrete flows we often have some internal state in the processor. In this gist I provide a mechanism where the internal state can implemented using loop/recur instead of needing to use imperative updates. As an example I implement the "batch with both a maximum batch size and a maximum latency" process. https://gist.github.com/awwx/18252e376d2480b893fb95ea2c98f56b

πŸ‘ 4
leonoel 2024-05-01T19:41:36.460919Z

"if or none" makes sense to me. I came across this pattern several times, the name I have in mind is where (and where-let,`where-some`) because β€’ it's about filtering, like SQL where β€’ it's the same kind of word as when, which has the same siblings with the same syntax

πŸ‘ 2
Eric Dvorsak 2024-05-01T19:55:44.312909Z

I like where, ifn makes me think of https://clojure.github.io/clojure/javadoc/clojure/lang/IFn.html

leonoel 2024-04-19T09:23:27.289529Z

@yenda1 I'm struggling to find a purely functional solution to this problem. Could you share some inspiration from other languages/libraries ?

Eric Dvorsak 2024-04-19T09:24:45.390109Z

manifold.stream/batch does that but I doubt it's purely functional

2024-04-19T09:50:40.752219Z

The requirement is a little bit tricky. Does the timeout start at the first value of a partition or after the last seen value from the flow? If we do 3 elements partition and we had value1 2 seconds ago and value2 1 second ago and the timeout is 2 seconds, do we emit a pack or do we still have 1 second to wait after value2 ? Assuming the later case, I have implemented something with delayed values and custom transducers. Every value is emitted with a delayed twin then a custom transducer discard a delayed twin if a new value was in between but if a value and its twin are successive a separator mark is propagated. At last, a custom partition-all transducer can make pack of n values and force partitioning when separation mark is found.

leonoel 2024-04-19T09:56:44.850199Z

I think this one is close

(defn batch [max-size max-delay input]
  (m/ap (let [[_ input] (m/?> (m/group-by {} input))]
          (m/? (m/reduce conj
                 (m/eduction (take-while (complement #{::timeout})) (take max-size)
                   (m/ap (m/amb= (m/?> input) (m/? (m/sleep max-delay ::timeout))))))))))

πŸ™Œ 1
xificurC 2024-04-19T11:04:22.990429Z

what is the purpose of m/group-by in the impl? Why "close", what's missing?

leonoel 2024-04-19T11:08:44.268849Z

without group-by, the input flow would be cancelled/restarted between successive batches

πŸ’‘ 1
Hendrik 2024-04-19T12:08:10.740369Z

Can you explain a little bit in more detail, why the group-by is needed? Why is would the flow restarted/cancelled without it?

2024-04-19T13:21:59.595159Z

I have never used group-by but it has interesting properties indeed: > Cancelling a group consumer makes it fail immediately. If a value is subsequently found for the same grouping, the key-group pair is produced again

😲 1
xificurC 2024-04-19T13:58:52.162239Z

(m/eduction (take 3) flow) will cancel flow after taking in 3 values. The group-by will spawn a new flow that can be consumed by eduction again

☝️ 3