Fork me on GitHub

I'm under the impression that I should not do side-effects inside the transducer passed to clojure.core.async/chan, and that would include swap!ing to an atom, correct?


swapping might be ok, but keep in mind that the transducer is operating within the context of a channel lock


you want all those mutexed operations to be get in and get out ASAP


.........what are you trying to do, from a non-mechanism perspective?


I've got a send-off style queue and I need to report to a user (browser UI) the number of jobs buffered in that queue when they attempt to enqueue their job (via offer). I'm updating an atom to track the queue state prior to and after processing.


(Let me know if that's not clear)


and you are doing something like incrementing a counter before putting into the channel and then expecting a transducer to decrement the counter?


send-off style queue means an unbounded queue?


hmm. I should have dropped the word 'send-off'. It's buffered


I was considering having the xform on the queue chan swap to the monitor atom prior to processing.


transducer will run when the buffer accepts an item


I believe that's what I want. I was wondering if I could swap in the xform to avoid chaining a go-loop that does that ( skip some detail because that puts me in a situation where it's more difficult for me to reason about the max buffer-size of the chain). Maybe I can throw together a little sample code


I think it will be easier if instead of using core.asyncs buffers, you used an unbuffered in and out channel, then wrote a go loop that copied from in the out, using something like a juc queue as a buffer


(fn [in out max-buffer report]
  (let [q (java.util.concurrent.LinkedBlockingQueue.)]
    (async/go-loop [closed? false]
      (let [[the-val the-chan] (async/alts! (vec (concat (when (and (not closed?)
                                                                    (> max-buffer (count q)))
                                                         (when (pos? (count q))
                                                           [[out (.peek q)]]))))]
        (cond (and (= the-chan in) (some? the-val))
                (.put q the-val)
                (async/!> report (count q))
                (recur closed?))
              (and (= the-chan in) (nil? the-val))
              (recur true)
              (= the-chan out)
                (.take q)
                (async/!> report (count q))
                (recur closed?)))))))
very explicit, reports the size of the buffer any time it changes


Hmm. I'm unable to get the above to compile. Fails with

(err) Syntax error macroexpanding clojure.core.async/go at (monitor_step.clj:19:5).
(err) No such var: async
Similar error if I try fully qualifying the async namespace. I know there are gotchas around macros that use closures internally, but I don't see any of the usual suspects I'm on clojure.core.async 1.5.648


ah! simple. the !> is reversed. I shoulda seen that


Wow, thanks I'll need to exercise that at the repl and think through it. I'm embarrassed to say I've never worked with java.util.concurrent, though I know it's really good stuff and I really should have by now. Here's what I was trying to describe in code:


So, is that a bad idea because of the swap! on the channel xform? (haha, or is it so filled with bad ideas that you don't know where to start ? :rolling_on_the_floor_laughing: )


that is a bad idea because it is a race condition

😔 1

there is nothing to say the value of monitor-state hasn't changed between your offer not being accepted and derefing it


You know at first I thought the reporting piece could be inaccurate, since it's just supposed to give a user an idea of what's queued ahead of them. But racing is ugly, so I'll do something along the lines of what you suggested. thanks!