Fork me on GitHub
#missionary
<
2021-09-01
>
martinklepsch22:09:28

I’ve been a little surprised today by the fact that combining a m/observe and m/eduction will cancel the observe flow unless there’s also a m/reduce at the end (might also be described as “flow is cancelled if there’s no consumer “)

leonoel07:09:55

could you share a snippet ?

martinklepsch12:09:06

(def click-log
   (->> (m/observe
         (fn [!]
           (.addEventListener js/window "click" !)
           #(.removeEventListener js/window "click" !)))
        (m/eduction (map js/console.log))))

 (def cancel
   (click-log #(js/console.log :done) #(js/console.log :error %)))
with this I’m getting a “consumer is not ready” error

leonoel13:09:57

observe is not backpressured, therefore it assumes the pipeline will process events fast enough. If not, the callback will throw "consumer is not ready". To prevent this, you should insert a relieve stage between observe and the slow consumer. If you're dealing with continuous values, the common idiom is to relieve with {}, ie discard oldest value.

leonoel13:09:23

as an aside, this is not the right way to run a flow. The low-level protocol is significantly different from tasks, you should probably use reduce or stream! with reactor instead, and only then run the resulting task

martinklepsch22:09:52

Is there an example of how to combine multiple flows into one somewhere? In particular I’m trying to build up subscriptions to a database into a sort of materialized view on the client. This involves combining multiple input flows (subscriptions)

ribelo22:09:58

I tried to do something identical, but I failed

ribelo22:09:47

but somewhere in the archives should be my conversation with @U053XQP4S from which I understand less than I would like 🙃

mjmeintjes02:09:43

amb> can be used to combine flows sequentially: (m/? (m/reduce conj (m/ap (m/amb> (m/?> (m/seed [1 2 3])) (m/?> (m/seed [4 5 6])))))) returns [1 2 3 4 5 6]

leonoel07:09:40

see also zip and latest

martinklepsch12:09:48

I think latest is what I was looking for! 🙂