Fork me on GitHub
#sql
<
2022-08-29
>
kirill.salykin15:08:11

Hi I am trying to use clojure.java.jdbc reducible-query to perform batched data updates If i understand the approahc corectly - i need to collect batches inside reducing function and once the batch is full (eg (= (count batch) batch-count) - do some inserts. Which seems to work ok. but there is a problem with last batch - if there is less records than batch - it would never be processed. Please advice is there a way to solve this? I need to figure out if it is a last record from reducible-query- is it possible to utilize reduced here? Or maybe there is another way of going from the reducible to lazy-seq? The requireement is to avoid loading entire dataset into memory Sorry for such long question

dpsutton17:08:52

(transduce (comp (partition-all 1000)
                   (map (fn [batch]
                          ;; this could do arbitrary stuff to your batch
                          (count batch))))
             conj
             (range 3500))
wouldn’t this work for you?

dpsutton17:08:02

just use partition-all in a transducer?

dpsutton17:08:40

and look in the source of partition-all. It uses the completion arity to check if there is an unfinished partition and operate on it

dpsutton17:08:23

or even better

(transduce (partition-all 1000)
             (fn
               ([] nil)
               ([_] (println "done uploading"))
               ([_ batch]
                (println (count batch))))
             (range 3500))

kirill.salykin18:08:16

The tricky part here is reducible-query The connection/statement is open only during the reduce phase (eg around reducing function) Thus all work should be done within the reducing function Constructing a batch in accumulator is easy, the only tricky part is last records, i need a way within a reducing to figure out that i am dealing with last record so i can do a batch insertion

dpsutton18:08:48

my transduce example follows that i believe?

kirill.salykin18:08:18

Is transduce conpatible with IReduce?

dpsutton18:08:33

yes it uses it

dpsutton18:08:48

(let [f (xform f)
           ret (if (instance? clojure.lang.IReduceInit coll)
                 (.reduce ^clojure.lang.IReduceInit coll f init)
                 (clojure.core.protocols/coll-reduce coll f init))]
       (f ret))

dpsutton18:08:50

check out the source

kirill.salykin18:08:51

Then I believe your example should work

kirill.salykin18:08:15

Thanks! I ll give it a try tomorrow

kirill.salykin18:08:44

(My transduce-fu is not that good)

seancorfield19:08:55

Pretty sure @U0NCTKEV8 hit this issue as well but I can't remember what the resolution was (the completion arity of partition-all is/was invoked outside the reducing context so the result set is already closed.

seancorfield19:08:05

He reminded me that I documented it for next.jdbc https://github.com/seancorfield/next-jdbc/commit/91dda2cdae3f9e897a54fe21edf8467acae8aa0d @U1V7K1YTZ and the same thing applies to c.j.j's reducible-query

dpsutton19:08:22

this is a consequence of partition-all calling (rf result v) in the completion arity?

dpsutton19:08:47

it attempts to get the last partition after the completion arity chain has begun being called?

hiredman19:08:21

It is a combination of things, I believe mostly related to the sort of lazy maps that next.jdbc uses

hiredman19:08:27

So I don't thing it is going to be directly comparable to issues with clojure.java.jdbc

seancorfield19:08:26

I don't remember the details of c.j.j's reducible-query but I would expect it to fall foul in the same way as next.jdbc does with stateful transducers...

seancorfield19:08:59

It uses the same kind of "mapified" result set abstraction...

kirill.salykin19:08:55

Thanks a lot - will look and try tomorrow

kirill.salykin10:08:10

I ended up with something like this:

(let [last-batch (->> (jdbc/reducible-query ...)
                      (reduce (fn [acc row]
                                (if (= (count acc) batch-size)
                                  (do (work acc)
                                      ;; "reset" the acc
                                      (transient []))
                                  (conj! acc row)))
                              (transient [])))]
      (work last-batch))

kirill.salykin10:08:43

so I just handle last batch as additional action, and thats it

seancorfield15:08:04

Where do you convert the transient back to persistent? I also think (work last-batch) will fail in some situations, because the result set will have been closed before you process all of it -- I think this is a very fragile solution.

kirill.salykin10:09:33

> I also think (work last-batch) will fail in some situations, because the result set will have been closed before you process all of it -- I think this is a very fragile solution. Maybe I am mistaken, but I think last-batch will contain mappify data and thus wouldnt need open connection sofar it worked for me (but only in tests tho)

kirill.salykin10:09:52

> Where do you convert the transient back to persistent? inside work