Hey team! I want to create a worker that tail the Postgres WAL and does some work. As the first step, I wanted to create an abstraction which handles connecting to the PG wal, reading data, parsing it, and passing it along. future + core-async looks was my first idea: I could hide this complexity and just return a channel that receives wal records. Here's the code:
(defn subscribe> [pool config slot-name]
(let [{:keys [lsn]} (create-logical-replication-slot! pool slot-name "wal2json")
replication-conn (get-pg-replication-conn config)
stream (start-replication-stream replication-conn slot-name lsn)
ch (chan)]
(future
(try
(loop []
(let [buffer (.read stream)
record (wal-buffer->record buffer)
last-receive-lsn ^LogSequenceNumber (.getLastReceiveLSN stream)]
(when (>!! ch record)
(.setAppliedLSN stream last-receive-lsn)
(.setFlushedLSN stream last-receive-lsn)
(recur))))
(catch Exception e
(>!! ch e)
(a/close! ch)))
(.close replication-conn)
(.close stream)
(delete-replication-slot! pool slot-name))
ch))
In brief:
• Set up all the pg-related replication stream stuff
• Parse the stream, converting the bytebuffer to an object
• Pass that along to a channel
• Return the channel
• If a caller wants to stop listening, they can close the channel. This will trigger some bookkeeping here (closing replication-coon, stream, and deleting the replication-slot)
I am a noob in core-async though. I am not sure how idiomatic this is, or if there's any gotchas. Thoughts much appreciated!Current edit:
(defn- consume-stream
[stream cb]
(loop []
(let [buffer (.read stream)
record (wal-buffer->record buffer)
last-receive-lsn ^LogSequenceNumber (.getLastReceiveLSN stream)]
(cb nil record)
(.setAppliedLSN stream last-receive-lsn)
(.setFlushedLSN stream last-receive-lsn)
(recur))))
(defn subscribe
[pool config slot-name cb]
(let [{:keys [lsn]} (create-logical-replication-slot! pool slot-name "wal2json")
replication-conn (get-pg-replication-conn config)
stream (start-replication-stream replication-conn slot-name lsn)
fut (fut-bg
(try
(log/infof "[wal] starting replication stream: %s" slot-name)
(consume-stream stream cb)
(catch Exception e (cb e nil))
(finally
(log/infof "[wal] closed replication stream: %s" slot-name)
(.close replication-conn)
(.close stream)
(delete-replication-slot! pool slot-name))))]
(fn stop []
(.close stream)
@fut)))
I realized a cb works just as simply here, so removed the use of core.async. I separated the logic into two different functions.