Fork me on GitHub
#code-reviews
<
2015-07-10
>
devon18:07:44

Hey all, I'm trying to write my first transducer. I'm processing an endless channel of data and would like to simple do something every n elements processed. Does this look right?

(defn do-every
  "Returns a transducer that executes f every n steps."
  [n f]
  (fn [rf]
    (let [i (volatile! 0)]
      (fn
        ([] (rf))
        ([result] (rf result))
        ([result input]
         (when (zero? (mod (vswap! i inc) n))
           (f))
         (rf result input))))))
I copied the volatile! from an example on http://clojure.org/transducers, but, I admit, I'm not entirely sure what it does...

alejandro20:07:35

@devon: have you tried partition-all?

alejandro20:07:26

I may have misunderstood, were you just trying to implement similar behavior?

alejandro20:07:32

nvm, it’s just side-effecting f with no args

alejandro20:07:04

looks reasonable. I think of volatile as an atom that can only be written to from a single thread at a time, but is faster than an atom because of that. If you find good docs explaining its usage, please share!

devon20:07:25

oh, that makes sense, great to know! thanks!

ghadi20:07:19

devon: note that channel transducers run inside the internal channel locks, so that should be a quick side-effect

ghadi20:07:40

not a potentially blocking one

ghadi20:07:17

if it is not in that category -- probably should run that function not in a transducer

devon20:07:05

This one is, it's just logging. But, out of curiosity, is it any worse to have a potentially blocking code in transducer on the channel vs something that's consuming off of it? Assuming that it's only one thread is consuming?

ghadi20:07:47

it is worse because it's easy to forget about it and do it in multiple places in the application and generate surprising deadlocks

ghadi20:07:39

channel monitoring is on the roadmap, and I'm sure Rich and Stu have thoughts.

devon21:07:05

great to know, thanks!