Fork me on GitHub
#missionary
<
2023-05-15
>
J16:05:49

Hello. First try of missionary and datomic (peer) (https://gist.github.com/jeans11/845fdeedbf1257a8a3355d03f6bd4b31). Works with electric. Is this a correct way to consume the tx-report-queue ? I didn’t find a way to use observe .

leonoel18:05:37

(defn latest-db [!conn]
  (m/relieve {}
    (m/observe
      (fn [!]
        (! (d/db !conn))
        (let [q (d/tx-report-queue !conn)
              t (Thread. ^Runnable
                  (fn []
                    (! (:db-after (.take ^Queue q)))
                    (recur)))]
          (.start t)
          #(do (.interrupt t) (.join t)
               (d/remove-tx-report-queue !conn)))))))
this is the observe way (warning - untested)

👍 2
leonoel18:05:16

your version is correct too

Dustin Getz21:06:51

How do the above approaches compare to this:

Dustin Getz21:06:25

I also don't understand the reference to m/signal (the upcoming primitive in missionary-next), can you spell it out for me please

leonoel21:06:27

datomic shares a single queue to all subscribers, so when there is more than one subscriber only one can observe a given change. m/signal acts as a proxy publisher enforcing a single subscription

Dustin Getz13:06:52

For completeness, this is a modified version of Leo's latest-db above, that silences log spam — JVM threads, when they terminate due to uncaught exception, the thread calls getUncaughtExceptionHandler which seems to have a JVM default impl that prints the exception to stderr. So we silence that.

👍 4
Dustin Getz16:06:39

Actually this seems to leak the thread on cancellation? I think we want to use m/via per J’s original post, and pair it with the defonce singleton pattern for correct broadcast to multiple sessions. That would mean cancellation is handled by m/via , no threads ever die (since we're leveraging missionary threadpool) and therefore there shouldnt be any logspam, right?

leonoel16:06:11

the thread is interrupted on cancellation, why should it leak ?

Dustin Getz17:06:02

Oh, the exception is signaling the thread is being harvested, i read it wrong and thought it was silencing the ThreatInterruptException that causes the thread to terminate in the first place (IIRC)