This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2023-04-20
Channels
- # ai (1)
- # announcements (4)
- # babashka (61)
- # babashka-sci-dev (14)
- # biff (15)
- # calva (34)
- # clj-kondo (43)
- # clj-on-windows (1)
- # clojure (123)
- # clojure-europe (31)
- # clojure-nl (1)
- # clojure-norway (8)
- # clojure-uk (4)
- # clojurescript (12)
- # code-reviews (2)
- # community-development (9)
- # core-async (3)
- # datahike (4)
- # datomic (61)
- # events (1)
- # graphql (3)
- # hyperfiddle (155)
- # introduce-yourself (1)
- # lsp (64)
- # malli (10)
- # pathom (10)
- # reagent (5)
- # reitit (6)
- # shadow-cljs (2)
- # tools-build (2)
- # vim (8)
- # xtdb (3)
Hey team! I want to create a worker that tail the Postgres WAL and does some work. As the first step, I wanted to create an abstraction which handles connecting to the PG wal, reading data, parsing it, and passing it along. future + core-async looks was my first idea: I could hide this complexity and just return a channel that receives wal records. Here's the code:
(defn subscribe> [pool config slot-name]
(let [{:keys [lsn]} (create-logical-replication-slot! pool slot-name "wal2json")
replication-conn (get-pg-replication-conn config)
stream (start-replication-stream replication-conn slot-name lsn)
ch (chan)]
(future
(try
(loop []
(let [buffer (.read stream)
record (wal-buffer->record buffer)
last-receive-lsn ^LogSequenceNumber (.getLastReceiveLSN stream)]
(when (>!! ch record)
(.setAppliedLSN stream last-receive-lsn)
(.setFlushedLSN stream last-receive-lsn)
(recur))))
(catch Exception e
(>!! ch e)
(a/close! ch)))
(.close replication-conn)
(.close stream)
(delete-replication-slot! pool slot-name))
ch))
In brief:
• Set up all the pg-related replication stream stuff
• Parse the stream, converting the bytebuffer to an object
• Pass that along to a channel
• Return the channel
• If a caller wants to stop listening, they can close the channel. This will trigger some bookkeeping here (closing replication-coon, stream, and deleting the replication-slot)
I am a noob in core-async though. I am not sure how idiomatic this is, or if there's any gotchas. Thoughts much appreciated!Current edit:
(defn- consume-stream
[stream cb]
(loop []
(let [buffer (.read stream)
record (wal-buffer->record buffer)
last-receive-lsn ^LogSequenceNumber (.getLastReceiveLSN stream)]
(cb nil record)
(.setAppliedLSN stream last-receive-lsn)
(.setFlushedLSN stream last-receive-lsn)
(recur))))
(defn subscribe
[pool config slot-name cb]
(let [{:keys [lsn]} (create-logical-replication-slot! pool slot-name "wal2json")
replication-conn (get-pg-replication-conn config)
stream (start-replication-stream replication-conn slot-name lsn)
fut (fut-bg
(try
(log/infof "[wal] starting replication stream: %s" slot-name)
(consume-stream stream cb)
(catch Exception e (cb e nil))
(finally
(log/infof "[wal] closed replication stream: %s" slot-name)
(.close replication-conn)
(.close stream)
(delete-replication-slot! pool slot-name))))]
(fn stop []
(.close stream)
@fut)))
I realized a cb works just as simply here, so removed the use of core.async. I separated the logic into two different functions.