Hi!
I was just looking into transactions of "serializable" isolation level, and it appears that with that it should be possible to build the equivalent of clojure.core/swap! but for databases instead of atoms: Read data from the DB, perform a computation, write data to the DB, and have this retried if the data in the DB changed between the read and the write. I looked into the docs, but could not find it for next.jdbc (and neither for clojure.java.jdbc). Does such function exist? If not, is it because it's a bad idea to build it?
depending on your database and the guarantees you're looking for, if you're writing to the same rows that you're reading, you might be able to use SELECT ... FOR UPDATE when you read the data to get the same effect
if you're using HoneySQL, for example, you can add for to your map:
https://github.com/seancorfield/honeysql/blob/develop/doc/clause-reference.md#for
First of all, serialization is not a function but an isolation level. You can specify it using the transaction macro:
(jdbc/with-transaction [tx db {:isolation :serialized}]
...)
but: in increases the time and cost of committing transactions. Plus you should be ready to restart transactions if they result in conflictdepending on what you are doing you might not need transactions at all, a simple update on a single row in a single table is atomic (although there can be some differences between sqls, and you have to be careful and make sure)
Detecting issues and handling retries is DB-dependent.
E.g. for PostgreSQL you'd have to check for 40001 and 40P01 errors.
And as it was said above, for atomic read/update, you should open a transaction and select a row using select for update , and then update it and commit
I've done this sort of thing a fair bit, including implementing IAtom (the atom interface) over db rows, with just something like update some-table set foo=1 where foo=0 and id = 10
the returned count of updated rows tells you if the update succeeded or not
Here's the code that I ended up using, with app-specific bits removed:
(alter-var-root #'next.jdbc.transaction/*nested-tx* (constantly :prohibit))
(def ^:dynamic *tx-id* nil)
(defn get-real-connection [c]
(or (:connectable c) c))
(def retryable-sql-states (into #{}
(map #(.getState ^PSQLState %))
[PSQLState/SERIALIZATION_FAILURE
PSQLState/DEADLOCK_DETECTED]))
(defn with-transaction' [sym transactable opts body]
`(binding [*tx-id* (or *tx-id* (str (gensym "tx")))]
(let [txable# ~transactable
opts# (assoc ~opts :isolation :serializable)]
(if (instance? Connection (get-real-connection txable#))
(let [~sym txable#]
(do ~@body))
(loop []
(let [{:keys [~'retry? ~'result]}
(try
{:result (next.jdbc/with-transaction+options [~sym txable# opts#]
~@body)}
(catch ExceptionInfo ~'e
(let [~'data (ex-data ~'e)]
(if (::wrapped ~'data)
(let [~'cause (ex-cause ~'e)]
(if-let [~'state (when (instance? SQLException ~'cause)
(retryable-sql-states (.getSQLState ^SQLException ~'cause)))]
(do
(log/info (str "Retrying transaction [" *tx-id* "] due to " ~'state))
{:retry? true})
(do
(tap> [::error ~'cause (dissoc ~'data ::wrapped)])
(throw ~'e))))
(throw ~'e)))))]
(if ~'retry?
(do
(Thread/sleep 100)
(recur))
~'result)))))))
(defmacro with-ro-tx [[sym transactable opts] & body]
(with-transaction' sym transactable `(assoc ~opts :read-only true) body))
(defmacro with-rw-tx [[sym transactable opts] & body]
(with-transaction' sym transactable `(assoc ~opts :read-only false) body))For larger context: I'm looking into clojure.core/swap! semantics, because the "computation" I have to perform cannot (easily) be done on the DB. I load some rows, select one of them based on a computation, then rewrite the whole table (move a flag from one row to another) or do nothing. Afterwards, if I moved the flag / rewrote the table, I need to schedule some expensive / destructive operation, that I only want to schedule if the content of the table changed. I'd like this to be concurrency safe -- my service runs with multiple replicas and the database is the only thing they share.
the real issue if you want it to be 100% no wiggle room safe is the time between transaction commit and the scheduling of the expensive operation (assuming that the scheduling isn't in the database as well)
I think how you are thinking swap! works and what that would mean in a database with transactions doesn't exactly match
swap! runs a loop like (loop [] (let [data @a] (when-not (compare-and-set! a data (f data)) (recur))))
where compare-and-set! atomically sets the value of a to whatever (f data) returned if the current value is data
so if you were going to implement an atom like semantics using transactions, the compare-and-set! is what you would implement, not the derefing (reading data from sql)
the nice thing about that is you function f doesn't run inside a transaction
the downside is if f is side effectful, those could be repeated and retried
and I say you might have a mismatch because I suspect you want the read (the deref) to be part of the transaction too, which is stricter than atoms
Thanks! I think that helps. Focussing on the compare-and-set! part might simplify my problem.
I was roughly aware how swap! works and how it re-runs the function incl. side effects and I think it's what I need: Run the read/compute/write with such a loop until it "settles", similar to swap!, and then return the ID of the row we moved the flag to (or nil if the flag was already in the expected position), and if we get a non-`nil` value from that swap!-equivalent, we schedule the expensive / destructive operation. But maybe there is a simpler way without involving transactions.
The key thing about CAS is you need to be able to specify what the previous value and what the old value is, and in the transaction check that the current value is still the old value, then change it to the new value
and I think trying to do that with multiple rows is going to be an issue
cas will have to, in the transaction, basically re-run the read query to verify the results didn't change, so unless your f is a monster (long runtime) it might be better just to do a single read, modify, write cycle inside a transaction (the tx will still need retrying of course)