Fork me on GitHub

I’ve been a little surprised today by the fact that combining a m/observe and m/eduction will cancel the observe flow unless there’s also a m/reduce at the end (might also be described as “flow is cancelled if there’s no consumer “)


could you share a snippet ?


(def click-log
   (->> (m/observe
         (fn [!]
           (.addEventListener js/window "click" !)
           #(.removeEventListener js/window "click" !)))
        (m/eduction (map js/console.log))))

 (def cancel
   (click-log #(js/console.log :done) #(js/console.log :error %)))
with this I’m getting a “consumer is not ready” error


observe is not backpressured, therefore it assumes the pipeline will process events fast enough. If not, the callback will throw "consumer is not ready". To prevent this, you should insert a relieve stage between observe and the slow consumer. If you're dealing with continuous values, the common idiom is to relieve with {}, ie discard oldest value.


as an aside, this is not the right way to run a flow. The low-level protocol is significantly different from tasks, you should probably use reduce or stream! with reactor instead, and only then run the resulting task


Is there an example of how to combine multiple flows into one somewhere? In particular I’m trying to build up subscriptions to a database into a sort of materialized view on the client. This involves combining multiple input flows (subscriptions)


I tried to do something identical, but I failed


but somewhere in the archives should be my conversation with @U053XQP4S from which I understand less than I would like 🙃


amb> can be used to combine flows sequentially: (m/? (m/reduce conj (m/ap (m/amb> (m/?> (m/seed [1 2 3])) (m/?> (m/seed [4 5 6])))))) returns [1 2 3 4 5 6]


see also zip and latest


I think latest is what I was looking for! 🙂