Fork me on GitHub
#clojure-dev
<
2017-09-12
>
reborg17:09:29

I was wondering if there is any reason why r/take-while, r/take and r/drop in reducers don't use folder (instead of reducer)? With the following practical effects:

(require '[clojure.core.reducers :as r])
(time (->> (range 50000)
       (into [])
       (r/map range)
       (r/mapcat conj)
       (r/drop 0)
       (r/filter odd?)
       (r/fold +)))
;; "Elapsed time: 45516.963356 msecs"
;; 10416041675000
(time (->> (range 50000)
       (into [])
       (r/map range)
       (r/mapcat conj)
       (r/filter odd?)
       (r/fold +)))
;; "Elapsed time: 9190.562896 msecs"
;; 10416041675000

hiredman17:09:53

they are stateful, and the state results from a linear scan, which doesn't parallelize as a tree

reborg17:09:17

not sure I get that. The reduction happens on a core on a single chunk. So are the results going to be different if the drop is foldable?

bronsa17:09:20

think about how take-while would behave if processing 2 chunks in parallel

bronsa17:09:48

as hiredman says, they are inherently linear operations

hiredman17:09:43

(def v (range 100))

(= (->> v
        (partition-all 32)
        (mapcat f))
   (drop 1 v))

hiredman17:09:22

is there some f that can safely be called in parallel that would result in true?

reborg17:09:31

Still trying to understand. Created:

(defn drop2
  [n coll]
  (r/folder
    coll (fn [rf]
      (let [nv (volatile! n)]
        (fn
          ([result input]
           (let [n @nv]
             (vswap! nv dec)
             (if (pos? n)
               result
               (rf result input)))))))))
and used in place of r/drop. What would you expect to happen/not-happen?

ghadi17:09:52

that has a clear race

ghadi17:09:05

volatiles are not atomic

hiredman17:09:17

it has many races

hiredman17:09:12

and it assumes things are happening in order, which by definition when running in parallel it is not

ghadi17:09:58

actually, the volatile doesn't appear to escape each partition, so it's not a race, but it's not tracking a global count

ghadi17:09:53

anywhere the docstring says "stateful transducer" you have to be cognizant of threads.

hiredman17:09:31

right, you end up dropping from each partition vs. dropping from the whole thing, which is an entirely different behavior from the non-folding drop

reborg17:09:32

drop2 returns the same results as r/drop, it's performing in parallel (as in fork-join parallel) but timings are much longer.

hiredman18:09:02

if you think it does then your test isn't correct

hiredman18:09:43

for example you are pushing in something that doesn't fold in parallel (like a seq) or you are dropping 0 which does nothing, etc

hiredman18:09:32

or you are witnessing one possible interleaving

reborg18:09:45

If I (println (Thread/currentThread)) from inside (fn [result input]) I see fork-join threads. I thought that indicated going fork-join correctly.

ghadi18:09:55

you'll be in a FJPool, but the code is not correct. What is your correctness test?

ghadi18:09:09

Perhaps we can tweak that to expose the problem

reborg18:09:55

I'm executing the fold at the beginning alternating r/drop and drop2 with different initial ranges. I was expecting different results (even repeating the same) in case of race conditions. Not sure that's good definition of correctness

hiredman18:09:22

user=> (require '[clojure.core.reducers :as r])
nil
(defn drop2
  [n coll]
  (r/folder
    coll (fn [rf]
      (let [nv (volatile! n)]
        (fn
          ([result input]
           (let [n @nv]
             (vswap! nv dec)
             (if (pos? n)
               result
               (rf result input)))))))))
#'user/drop2
user=> 
user=> (def x (vec (range 10000)))
#'user/x
user=> (def y (->> x (drop2 10) (r/fold +)))
#'user/y
user=>  (def z (->> x (drop 10) (reduce +)))
#'user/z
user=> (= z y)
false
user=> z
49994955
user=> y
49969955
user=> 

ghadi18:09:51

there's a devequivtest macro in the test suite that checks for seq/reducer equivalence https://github.com/clojure/clojure/blob/master/test/clojure/test_clojure/reducers.clj

ghadi18:09:59

it or something like it may be helpful

reborg18:09:38

also interesting that (->> x (r/drop 10) (r/fold +)) is 49994955

hiredman18:09:19

instead of dropping the first 10 numbers, it is dropping 10 from every partition

reborg18:09:14

which is what I would expect

ghadi18:09:17

no, it's just the first 10 in those two examples

ghadi18:09:29

it's the 49969955 y result that is erroneous

hiredman18:09:47

yeah, no, that is not the contract for r/drop (and the others)

hiredman18:09:14

with pure functions, the behavior for reducers is expected to be exactly the same

hiredman18:09:35

so you think it is possible because you want to solve an easier problem 🙂

ghadi18:09:51

with most of c.c.reducers effectively obsoleted by transducers, I think it would be a useful exercise to translate some fold examples to use transducers instead

ghadi18:09:29

perhaps that will illustrate some flaws in fold that need adaptation.

ghadi18:09:47

(Like completion)

reborg18:09:01

(r/fold ((drop 10) +) (vec (range 10000))) looks like it's returning different results each run?

ghadi18:09:30

the transducer is applied at the wrong time

ghadi18:09:42

that will work for something like map (stateless)

ghadi18:09:08

fold needs to be in charge of transforming reducing functions

ghadi18:09:19

and do it as it partitions

ghadi18:09:23

what is happening in your scenario: you create one reducing function ((drop 10) +), it's passed to a bunch of threads that all use it in a non-specified order

ghadi18:09:39

they all racily/non-atomically modify the volatile

reborg18:09:06

it's kind of what I was expecting for that drop2 after thinking twice

reborg18:09:46

and yeah, same thing. Also that drop2 suffers the same

reborg18:09:29

starting to connect the dots, thanks for your time

ghadi18:09:34

my pleasure. the relationships between Transducers / Reducers / Fold have not been made explicit

reborg19:09:57

I think the "problem" is work stealing. A chunk can always fly on a thread where the volatile doesn't have the right count, since it already processed a chunk

ghadi19:09:17

there is no way to make this work with volatiles because they lack atomicity

reborg19:09:58

not even when the volatile is correctly closed over and the function allocated every time?

ghadi19:09:40

then it wouldn't be a global count, but a local count

ghadi19:09:49

it would be a different function