Fork me on GitHub
#missionary
<
2021-12-15
>
ribelo00:12:34

In this case, shouldn't the oldest values be discarded?

(def atm_ (atom 0))
(def >f (mi/watch atm_))

(def r ((mi/reduce (constantly nil)
                   (->> (mi/ap (let [x (mi/?> (mi/relieve {} >f))]
                                 (mi/? (mi/sp (mapv inc (range 1e7)) x)))) ;; takes about 200msc
                        (mi/eduction (map prn) (take 10))))
        prn prn))

(run! #(reset! atm_ %) (range 10))

ribelo00:12:54

How can I drop the oldest values?

leonoel10:12:21

the problem here is (mi/sp (mapv inc (range 1e7)) x) is not cancellable

leonoel10:12:58

also (mi/relieve {} >f) is not needed, watch already discards oldest values

ribelo10:12:02

but Cancelling an sp task triggers cancellation of the task it's currently running, along with all tasks subsequently run.

leonoel10:12:42

yes, but cancellation can only happen on specific points, and there aren't any in this one

leonoel10:12:28

because it's a purely synchronous computation, therefore it is assumed to be fast

ribelo10:12:17

so basically relieve relieving backpresure only if sleep is used?

leonoel10:12:11

not only sleep, any task supporting cancellation

ribelo10:12:24

by analogy to core.async, how to get the identical result in that case?

(def c (a/chan (a/dropping-buffer 5)))

(def r (a/go (loop []
               (when-let [v (a/<! c)]
                 (mapv inc (range 1e7))
                 (println v)
                 (recur)))))

(run! #(a/put! c %) (range 10))

leonoel10:12:10

(def m (m/mbx))

  (def >input
    (->> (m/ap (loop [] (m/amb> (m/? m) (recur))))
      (m/relieve (fn [x _] x))
      (m/buffer 5)))

  (defn heavy [x]
    (m/via m/cpu (mapv inc (range 1e7)) x))

  (def r
    ((m/reduce {} nil
       (m/ap (println (m/? (heavy (m/?> >input))))))
     prn prn))

  (run! m (range 10))

leonoel11:12:37

I'm not sure it produces the exact same result, but it is the same intent

leonoel11:12:55

unlike go blocks, sp and ap have no dedicated thread pool, therefore any cpu-intensive computation should be wrapped in a task with via cpu

leonoel11:12:47

you can insert a cancellation checkpoint with !

(defn heavy [x]
  (m/via m/cpu (mapv (fn [x] (m/!) (inc x)) (range 1e7)) x))

ribelo22:12:00

with m/via it does indeed work, but how about cljs? it does not implement m/via

ribelo22:12:30

Also, I tried using m/! but it looks like it has no effect on anything and doesn't work

ribelo22:12:49

if use m/via it works without m/!, but m/! won't work without m/via

ribelo23:12:07

out of curiosity I checked on cljs and the conditions for pool are never satisfied

ribelo23:12:52

.-token is never nil, or (.-type choice) is never switch

ribelo23:12:33

I wrote it wrong

ribelo23:12:52

I can't make it so that any of the conditions are met

leonoel09:12:50

sure, cljs doesn't have threads

leonoel09:12:58

but ! still works in sp and ap , as long as the code is asynchronous

ribelo11:12:53

would you be so kind and show an example of how to use it? unfortunately I can't create any without using sleep, where relieve and ! works 😞

leonoel13:12:21

in cljs you should not perform heavy computations anyways, and if there's no other choice you should split the computation with a zero timeout to release control to the event loop, so sleep is the way to go

ribelo20:12:59

thx ❤️

Johan01:12:20

Should http call be wrapped in a via ? (m/via m/blk (http-call))

Ben Sless05:12:43

If it's blocking. If it has callbacks then it's a task

👍 1