Fork me on GitHub
#code-reviews
<
2023-04-20
>
stopa20:04:33

Hey team! I want to create a worker that tail the Postgres WAL and does some work. As the first step, I wanted to create an abstraction which handles connecting to the PG wal, reading data, parsing it, and passing it along. future + core-async looks was my first idea: I could hide this complexity and just return a channel that receives wal records. Here's the code:

(defn subscribe> [pool config slot-name]
  (let [{:keys [lsn]} (create-logical-replication-slot! pool slot-name "wal2json")
        replication-conn (get-pg-replication-conn config)
        stream (start-replication-stream replication-conn slot-name lsn)
        ch (chan)]
    (future
      (try
        (loop []
          (let [buffer (.read stream)
                record (wal-buffer->record buffer)
                last-receive-lsn ^LogSequenceNumber (.getLastReceiveLSN stream)]
            (when (>!! ch record)
              (.setAppliedLSN stream last-receive-lsn)
              (.setFlushedLSN stream last-receive-lsn)
              (recur))))
        (catch Exception e
          (>!! ch e)
          (a/close! ch)))
      (.close replication-conn)
      (.close stream)
      (delete-replication-slot! pool slot-name))
    ch))
In brief: • Set up all the pg-related replication stream stuff • Parse the stream, converting the bytebuffer to an object • Pass that along to a channel • Return the channel • If a caller wants to stop listening, they can close the channel. This will trigger some bookkeeping here (closing replication-coon, stream, and deleting the replication-slot) I am a noob in core-async though. I am not sure how idiomatic this is, or if there's any gotchas. Thoughts much appreciated!

stopa19:04:52

Current edit:

(defn- consume-stream
  [stream cb]
  (loop []
    (let [buffer (.read stream)
          record (wal-buffer->record buffer)
          last-receive-lsn ^LogSequenceNumber (.getLastReceiveLSN stream)]
      (cb nil record)
      (.setAppliedLSN stream last-receive-lsn)
      (.setFlushedLSN stream last-receive-lsn)
      (recur))))

(defn subscribe
  [pool config slot-name cb]
  (let [{:keys [lsn]} (create-logical-replication-slot! pool slot-name "wal2json")
        replication-conn (get-pg-replication-conn config)
        stream (start-replication-stream replication-conn slot-name lsn)
        fut (fut-bg
             (try
               (log/infof "[wal] starting replication stream: %s" slot-name)
               (consume-stream stream cb)
               (catch Exception e (cb e nil))
               (finally
                 (log/infof "[wal] closed replication stream: %s" slot-name)
                 (.close replication-conn)
                 (.close stream)
                 (delete-replication-slot! pool slot-name))))]

    (fn stop []
      (.close stream)
      @fut)))
I realized a cb works just as simply here, so removed the use of core.async. I separated the logic into two different functions.