Fork me on GitHub
#missionary
<
2023-11-07
>
reedho12:11:32

From the last Leo's talk, one major feature of missionary is to assist proper "resource management", this reminds me of several months ago I thought about how using missionary to handle [datalevin range-seq api](https://cljdoc.org/d/datalevin/datalevin/0.8.21/api/datalevin.core#range-seq). The api is basically have to be used with with-open as below:

clojure
(let [v (atom 0)]
  (with-open [kv-seq (d/range-seq kvdb "table" [:all] :uuid :integer false)]
    (doseq [_x kv-seq]
      (swap! v inc)))
  @v)
Below is my recent attemp to convert the seq to flow, the goal is i want to use the seq as flow but correctly close the underlying resource.
clojure
(defn kv-flow
  [db]
  (let [kv-seq (d/range-seq db "table" [:all] :uuid :integer false)]
    (->> (m/observe
           (fn [cb]
             (.start
               (Thread.
                 (reify Runnable
                   (run [_]
                     (loop [xs kv-seq]
                       (if-let [x (first xs)]
                         (do (cb x)
                             (recur (try (next xs)
                                         (catch Exception _e
                                           (println _e "GOT-EXCEPTION")
                                           nil))))
                         (cb nil)))))))
             ;; cleanup
             (bound-fn []
               (println "CLEANUP")
               (.close kv-seq))))

         (m/eduction (take-while some?))
         )))

;; usage
(->> (kv-flow kvdb)
     (m/reduce conj)
     (m/?)
     (count))

There seems a race condition bug with above code, see [this gist](https://gist.github.com/reedho/ecfd73d4e997e3ed04064b379c512362) for detail. How to avoid the race condition bug? Is this approach correct in the first place?

xificurC12:11:58

untested sketch of how I'd approach it

reedho13:11:48

I try that and it almos woks, total count kv in my test is 200_000. Below code is works:

(->>
  (m/ap
    (let [rs (m/?> (m/observe
                     (fn [!]
                       (let [rs (d/range-seq kvdb "table" [:all] :uuid :integer false)]
                         (! (seq rs))
                         #(.close rs)))))]
      (m/?> (m/seed rs))))
  (m/eduction (take 1000))
  (m/reduce conj)
  (m/?)
  (count)
  )
If i remove the eduction or use take value greater than actual size, the flow won't finish.

reedho13:11:55

Made it to works this way:

(->>
  (m/ap
    (let [rs (m/?> (m/observe
                     (fn [!]
                       (let [rs (d/range-seq
                                  kvdb
                                  "table" [:all] :uuid :integer false)]
                         (! rs)
                         #(do (println "CLOSING") (.close rs))))))]
      (m/?>
        (m/amb (m/seed rs) (m/seed [nil])))))
  (m/eduction (take-while some?))
  (m/reduce conj)
  (m/?)
  (count))

reedho13:11:30

I would love to see if there is better solution šŸ™‚

xificurC14:11:12

indeed, the problem is the m/observe doesn't terminate after the range-seq is consumed

xificurC14:11:34

your solution with a sentinel value (which could be other than nil if you need to support nil as an output) will work. I scratched my head for a while and didn't come up with anything better. I'm sure there's a better solution, just can't think of one

reedho14:11:09

I see.. now i found new intuition wrt managing resource, thanks for the pointer.

leonoel18:11:52

(loop [xs (d/range-seq kvdb "table" [:all] :uuid :integer false)
       n 0]
  (if-let [x (first xs)]
    (recur (next xs) (inc n)) n))
=> 199899

leonoel18:11:23

it is not a race condition, it is a bug in datalevin's range-seq implementation

leonoel18:11:07

the count query is also wrong, because it counts the number of batches, not the number of items in all batches

leonoel18:11:03

(with-open [xs (d/range-seq kvdb "table" [:all] :uuid :integer false)]
  (reduce (fn [r x] (+ r (count x))) 0 xs))
=> 200000
(with-open [xs (d/range-seq kvdb "table" [:all] :uuid :integer false)]
  (loop [xs xs
         n 0]
    (if-some [[x & xs] (seq xs)]
      (recur xs (inc n)) n)))
=> 200000
(with-open [xs (d/range-seq kvdb "table" [:all] :uuid :integer false)]
  (loop [xs xs
         n 0]
    (if-let [x (first xs)]
      (recur (next xs) (inc n)) n)))
=> 199899
this final snippet shows the bug, this iteration scheme should give the correct answer

reedho23:11:28

Thanks for digging deeper. Hmm.. yea, i don't check the loop (without missionary) before, interesting. So, the issue with range-seq is that it give wrong result if iterated with loop, but correct if iterated with doseq or reduce. Is this correct description of the bug? Also, as the bug is in the range-seq , is the former implementation of kv-flow is idiomatic this way and provide better performance wise compared to the latter?

reedho00:11:56

I add some notes (sorry if this is out of topic) on the behavior of the sequence returned by datalevin range-seq. Total kv in the kvdb are 200_000 items.

;; This correctly iterate all the items
(->>
  (with-open [xs (d/range-seq kvdb "table" [:all] :uuid :integer false)]
    (doall (map identity xs)))
  count)
;; => 200000


;; Using `mapv` shows that each iteration value contains batch of items
(->>
  (with-open [xs (d/range-seq kvdb "table" [:all] :uuid :integer false)]
    (mapv identity xs))
  count)
;; => 1981


;; This verifies above assumption
(->>
  (with-open [xs (d/range-seq kvdb "table" [:all] :uuid :integer false)]
    (mapv identity xs))
  (mapcat identity)
  count)
;; => 200000
Is this behavior of "batched" lazy sequence used to be like this in general?

reedho01:11:00

After several trial and error, i found the "bug", updated in the gist here https://gist.github.com/reedho/ecfd73d4e997e3ed04064b379c512362 Doing (loop [xs (seq kv-seq)] ...) instead of (loop [xs kvseq] ...) as well as (next (seq xs)) instead of (next xs) give correct result.

leonoel09:11:55

OK, after more investigation here is my conclusion : ā€¢ the object returned by range-seq is reducible and seqable BUT the former is chunked and the latter is not. This is undocumented and confusing

;; reduction returns chunks of items, seq returns flattened items
(not= (with-open [xs (d/range-seq kvdb "table" [:all] :uuid :integer false)] (vec xs))
    (with-open [xs (d/range-seq kvdb "table" [:all] :uuid :integer false)] (vec (seq xs))))
ā€¢ it is not allowed to call more than once a sequence function like seq first or next on the object returned range-seq. The documentation says "It represents only one pass over the data range, and seq function needs to be called to obtain a persistent collection", so it is indeed implied that an instance cannot be reused, that explains why your first attempt failed and why calling seq upfront fixed it. Calling seq on subsequent iterations should not be required. I personally find this API design choice questionable but it may not be considered as a bug
;; UB : traversing the same range-seq instance twice
(with-open [xs (d/range-seq kvdb "table" [:all] :uuid :integer false)]
    (not= (vec xs) (vec xs)))

šŸ‘ 2
reedho09:11:24

Thank you, learn much from your investigation. šŸ™

reedho23:11:28
replied to a thread:From the last Leo's talk, one major feature of missionary is to assist proper "resource management", this reminds me of several months ago I thought about how using missionary to handle [datalevin `range-seq` api](https://cljdoc.org/d/datalevin/datalevin/0.8.21/api/datalevin.core#range-seq). The api is basically have to be used with `with-open` as below: clojure (let [v (atom 0)] (with-open [kv-seq (d/range-seq kvdb "table" [:all] :uuid :integer false)] (doseq [_x kv-seq] (swap! v inc))) @v) Below is my recent attemp to convert the seq to flow, the goal is i want to use the seq as flow but correctly close the underlying resource. clojure (defn kv-flow [db] (let [kv-seq (d/range-seq db "table" [:all] :uuid :integer false)] (-&gt;&gt; (m/observe (fn [cb] (.start (Thread. (reify Runnable (run [_] (loop [xs kv-seq] (if-let [x (first xs)] (do (cb x) (recur (try (next xs) (catch Exception _e (println _e "GOT-EXCEPTION") nil)))) (cb nil))))))) ;; cleanup (bound-fn [] (println "CLEANUP") (.close kv-seq)))) (m/eduction (take-while some?)) ))) ;; usage (-&gt;&gt; (kv-flow kvdb) (m/reduce conj) (m/?) (count)) There seems a race condition bug with above code, see [this gist](https://gist.github.com/reedho/ecfd73d4e997e3ed04064b379c512362) for detail. How to avoid the race condition bug? Is this approach correct in the first place?

Thanks for digging deeper. Hmm.. yea, i don't check the loop (without missionary) before, interesting. So, the issue with range-seq is that it give wrong result if iterated with loop, but correct if iterated with doseq or reduce. Is this correct description of the bug? Also, as the bug is in the range-seq , is the former implementation of kv-flow is idiomatic this way and provide better performance wise compared to the latter?