Fork me on GitHub
#missionary
<
2023-12-09
>
J13:12:33

Hi guys! It's valid to do computation in the function of m/reduce like this example:

(def !x (atom {}))

(defn on-change! [event]
  (m/? (m/join vector (m/via m/blk (f1)) (m/via m/blk (f2)))))

(def run-listener
  (let [<event (m/signal (m/watch !x))]
    (m/reduce (fn [_ event] (on-change! event)) nil <event)))

Dustin Getz13:12:54

m/? inside clojure fn (as opposed to m/sp m/ap etc) is blocking

J13:12:39

So it's better to not blocking here?

Dustin Getz14:12:42

you only want to block inside the m/via threadpool

Dustin Getz14:12:03

try using m/? inside m/cp instead, so that the reduce callback is fast

Dustin Getz14:12:18

do the slow computation right before the reduce if possible i’m saying - hard to discern your intent here

J14:12:26

Thanks Dustin. The !x atom holding some redis counters. Every 30s, this atom is updated and I want to perform some task on the new atom value.

leonoel16:12:51

? is illegal inside cp

leonoel16:12:15

(defn on-change [event]
  (m/join vector (m/via m/blk (f1)) (m/via m/blk (f2))))

(def run-listener
  (m/reduce (constantly nil)
    (m/ap (m/? (on-change (m/?> (m/watch !x)))))))

J16:12:22

Many thanks guys!

telekid23:12:42

> ? is illegal inside cp Is that because it can prevent the continuous flow from having an initial value?

leonoel08:12:38

not only the initial value, as long as one expression is undefined the current state is undefined too

👍 1