Fork me on GitHub
#missionary
<
2023-05-11
>
J07:05:52

Hi! I continue my learning on missionary and I try to create a flow in order to observe datomic report queue but I have a mistake. I try this code:

(def f (->> (m/observe (fn [!]
                           (! (d/tx-report-queue conn))
                           #(d/remove-tx-report-queue conn)))
              (m/relieve {})
              (m/latest (fn [_] (d/db conn)))))

(m/? (m/reduce conj f))
But when a run the task the process block the repl and doesn’t response. I don’t see why.

leonoel07:05:47

The repl is blocked because the process never terminates, which is expected. I don't recommend using blocking m/? at the repl because most of them (if not all) don't support interrupting a pending evaluation. Use callbacks instead :

(def cancel ((m/reduce conj f) #(prn :success %) #(prn :failure %)))
;; process is running now
(cancel)

J07:05:02

Aaah yes! I miss this! Thanks

J07:05:13

Hello. I miss something here because the process block the repl:

(def poll-queue (m/sp (.poll ^LinkedBlockingQueue tx-queue)))

(def polling
  (m/ap
    (loop []
      (m/amb (m/? poll-queue)
             (recur)))))

(def run (->> polling
              (m/eduction (remove nil?))
              (m/reduce (fn [_ item]
                          (println 'poll item)
                          item)
                        nil)))

(def cancel-run (run #(prn %) #(prn %)))
It’s the polling flow is good here?

xificurC08:05:07

looks like a CPU heater, since .poll returns immediately. You'll need to use .take, which is blocking.

(def poll-queue (m/via m/blk (.take ^LinkedBlockingQueue tx-queue)))
Now you might be able to drop the remove nil? call as well

👍 2
J08:05:13

This is perfect! Thanks @U09FL65DK

👍 2