This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2024-01-27
Channels
- # announcements (24)
- # babashka (26)
- # beginners (8)
- # calva (8)
- # clojure (78)
- # clojure-europe (1)
- # clojure-norway (22)
- # clojurescript (14)
- # datascript (5)
- # datomic (8)
- # fulcro (22)
- # helix (9)
- # humbleui (11)
- # malli (4)
- # off-topic (28)
- # pedestal (5)
- # reitit (10)
- # shadow-cljs (2)
- # tools-build (8)
- # tools-deps (9)
I always assumed so, but I am chasing a bug that could be explained if it were in fact async
@U0NCTKEV8 got it. I do use ensure/alter but let me review to make sure
The logic is
(fn [signal-name {:keys [sender-id] :as args}]
(log/trace "rx:" signal-name args)
(dosync
(if (= signal-name acquire-chan)
(do
(log/trace "updating acquisition state: current waiters" @waiters)
(if-not (contains? (ensure waiters) sender-id)
(do
(alter queue conj args)
(alter waiters conj sender-id))
(do
(log/trace "duplicate"))))
(do
(log/trace "updating released state: current releases" @released)
(alter released conj signal-name)))))
so the only non-ensure based read is the log entry, which I assume would not be material here@U0NCTKEV8 does this mitigate the write-skew potential you mentioned
I guess the related question is: I assume updating multiple refs in one dosync block is also ok
What are you using for logging? tools.logging uses agents to only log on transaction commit
Something to keep in mind when debugging is the only way to get a consistent point in time read of multiple refs is using ensure in a dosync if you are inspecting the contents of refs trying to debug this
@U0NCTKEV8 after thinking about what you said, I may see a problem in my code but I wanted to confirm. I am doing something like this (loop [] (let [m (first @queue)] … (dosync (alter queue rest))))
Either way, I am thinking that I should dequeue fully in the transaction, i.e.
(loop []
(let [m (dosync (let [-m (first (ensure queue))] (alter queue rest) -m))]
...))
first and rest are sequence operations, they coerce arguments to a sequence before doing anything
Related, my logic now looks like
(loop []
(let [m (ref nil)]
(dosync
(ref-set m (-> state ensure :queue first))
(alter state update :queue rest))
..
(recur)))
It seems to be working the way I expected:
(loop [x [1 2 3 4]]
(when-let [m (first x)]
(println m)
(recur (rest x))))
1
2
3
4
(loop [x (reduce (fn [acc i] (conj acc i)) [] (range 4))]
(when-let [m (first x)]
(println m)
(recur (rest x))))
0
1
2
3
Been a while and my assumption was ref-set was non-transactional, which is incorrect
I just meant I was expecting the lifo nature of (rest) to cause an unexpected output, more than anything
but it appears to be in the order that I expect for fifo, so just trying to reconcile what you were highlighting
If things are processed fast enough that there is never more than one thing in the queue then it will look fifo
so, I was using STM/refs to transactionally produce only if the de-dup criteria was met
In this case, its a mutex construct built on top of temporal…but you can think of the high-level design is an actor pattern with core.async channels
essentially the actor is the arbiter of the lock, and consumers send an “acquire” signal and wait for “acquired” to be sent back, and then they send “release” when finished
In general the stm is not used much and the most common cases for queues are better served by either java.util.concurrent queues or core.async channels
thats exactly what I was doing before I ran into a need to ensure there were no duplicate acquire signals queued
under certain failure scenarios, I cannot guarantee the idempotency of the acquisition signal, so I need to de-dup it and that led to this challenge
In any case, by this https://clojurians.slack.com/archives/C03S1KBA2/p1706460642147929?thread_ts=1706392967.187879&cid=C03S1KBA2 it seems like its working the way I needed. IIU what you are saying is that more complex scenarios may break that?
in any case, the reason I was building a FIFO out of STM is solely because I needed to couple the dedup check atomically, and that rules out things like core.async or java.concurrency because you cant produce side-effects in things like refs/atoms, so I was trying to do both the FIFO and the deduplication in a STM friendly way
The way you avoid duplicates using a juc queue or channel is you put a process between the producer and the consumer that dedupes
With channels, depending on your needs, you can put that process inside the channel itself using a distinct or dedupe transducer
Got it. This STM logic essentially was that process, but I can see how a core.async variant may be applied
I think I now understand what you were saying about rest: The problem is the first time the consumer runs it converts the vector to a seq, and then subsequent conj would be lifo
Ive adjusted the algorithm to now be
(defn poll
"called by the mutex workflow to consume any available signals"
[state]
(let [m (ref nil)]
(dosync
(ref-set m (-> state ensure :queue first))
(alter state update :queue (comp vec rest)))
@m))
If you insist then use clojure.lang.PersistentQueue/EMPTY with peek and pop instead of first and rest
There is no guarantee that the read of m you are doing outside of the transaction will return the value you just wrote inside the transaction
Oh, that’s not good. I assume that is independent of first/rest vs peek/pop conversation
If this ref is the only ref you have you can replace it with an atom and use swap-vals!