Fork me on GitHub
#missionary
<
2023-10-23
>
J12:10:31

Hi guys! I play with rama (reactive query) and I have this missionary error: Can't process event - observer is not ready . Can you explain me the reason about this error? This error occur with this code:

(defn make-reactive-query [path pstate]
  (->> (m/observe
        (fn [!]
          (! nil)
          (let [p (r/foreign-proxy path pstate {:callback-fn (proxy-callback !)})]
            #(.close p))))
       (m/latest (fn [i] i))))
PS: If I remove (! nil) there is no error but it is correct to do this?

xificurC12:10:48

m/latest requires its input flows to have an initial value. (! nil) provides the m/observe flow with an initial value of nil. Hard to say more unless you give some context what are you trying to do (why are you calling m/latest, what behavior are you trying to accomplish)

J12:10:38

I want to create a flow in order to use it in electric.

xificurC12:10:37

if the initial value of nil works for you then that's fine. If you don't want to operate on the values until the first one comes in you can run (! (Failure. (Pending.))) which will throw Pending in electric. Instead of m/latest you should (m/relieve {}) , if dropping values is OK, not sure what foreign-proxy returns

catjam 1
J12:10:22

Perfect! Thanks for the explanation.

Dustin Getz10:10:01

the error is due to backpressure. relieve will make the error go away but i think is not quite the answer here. the problem is this rama API blocks until the first value is available and then calls the callback immediately. Resulting in two events emitted from observe during construction. So you either want to remove the nil emission, or you want a more complex initialization that does not block your flow construction on waiting for rama.

Dustin Getz10:10:59

also a bug was just fixed and released this week impacting this scenario of two emissions during observe constructor. the bug was that the constructor was incorrectly succeeding, it is correct to throw backpressure error here. so if you updated that explains why it was working before and isn’t now.

Dustin Getz10:10:19

also what does the rama API emit, a latest value or an event? it’s unclear if it is legal to simply relieve backpressure here as Peter said because you will miss rama events when rama updates more rapidly than your application consumer.

J12:10:06

> The rama api foreign-proxy return a stateful object called ProxyState . > The callback is trigger as soon as the data changes on the server. https://redplanetlabs.com/docs/~/pstates.html#_reactive_queries

xificurC12:10:59

after reading the docs, rama has API to observe the value as a whole (via .get()), but you can also attach a callback that will observe the diffs. In the future, once differential electric lands, plugging the diffs into electric will be simple and you'll get fine-grained reactivity all the way from rama to your UI. Today, if the returned query result is small, the pragmatic approach is to observe the value and e/for-by over it in electric. This will cause re-diffing, but for small collections you shouldn't notice issues.

Dustin Getz12:10:37

i think the diffs here are history sensitive, you'd need to see all diffs from the beginning of time. Or initialize the state with the initial value and then diff from that point. Presumably this is why rama has both apis?

Dustin Getz12:10:58

materialized offers both apis for this reason, you first query the initial value and then subscribe to diffs from that point forward