Fork me on GitHub
#sql
<
2022-03-29
>
Leonid Korogodski20:03:32

So, I have a setup where multiple SQL queries are executed in parallel, and we need to stream the results via a CSV formatter, then zip the resulting files, then upload to S3. All must be done simultaneously, with the upload starting as soon as the first bytes are zipped, and with the zipping starting as soon as the first CSV data appears, and the CSB formatting starting as soon as the data begin to come lazily from the db. For these purposes, we use Akka streams, calling Scala from Clojure. Akka makes sure that the data in the graph flows smoothly, with a backpressure-handling protocol, so that the actors only request the next data from a producer asynchronously when the consumer has the capacity to process it. As a result, the memory isn't supposed to grow, no matter how big the data returned by the queries is, only a slice of it loaded into memory at any given time (like a river that transports a lot of water without overflowing its banks). However, we see a problem at the db end, which isn't Akka. We use jdbc.core to query the database to produce a cursor, which is transformed into a lazy seq, then pass the iterator of the lazy seq to Source/fromIterator to create an Akka source (a node in the flow graph). Something like this:

(let [cursors (mapv #(jdbc.core/fetch-lazy connection % opts) sql-stmts) ;; could be multiple connections, too, depending...
      records (mapv #(.iterator (jdbc.core/cursor->lazyseq %)) cursors)]
  (...some code...))
It we didn't call .iterator, then the heads of the lazy seqs would have continued to be referenced and therefore not garbage-collected. So, as we keep processing more data, even though we fetch it lazily in chunks of 4000 rows at a time, the lazy seqs would keep growing in memory, as the already processed rows would not be released. However, even though we call .iterator, the memory still grows at this end. Any idea why? And how to make sure that the heads get lost?

hiredman22:03:28

You say lazy seqs, but are using mapv which produces a vector which is not lazy

hiredman22:03:32

You might be running into a chunking issue, seqs produced from vectors are chunked, and depending may be processed 32 elements at a time instead of one at a time

hiredman22:03:52

If by jdbc.core you mean (https://funcool.github.io/clojure.jdbc/latest/api/jdbc.core.html) I would suggest moving away from that. It started as a clone of the clojure contrib library clojure.java.jdbc, and there is some bad blood there about the author of clojure.jdbc copy and pasting code from clojure.java.jdbc

hiredman22:03:42

The primary maintainer of clojure.java.jdbc has released a next generation clojure jdbc library next.jdbc which addresses a lot of short comings in clojure.java.jdbc (and presumably in clojure.jdbc as well)

Leonid Korogodski22:03:48

Ah, good points. Thanks.

hiredman22:03:07

The next.jdbc library is built from the ground up around using reduce for processing results, which can be better for resource usage

Leonid Korogodski22:03:59

But that mapv just creates a vector of lazy seqs, not a vector of vectors. Well, a vector of iterators for lazy seqs.

Leonid Korogodski22:03:46

The problem with plan in next.jdbc is that, for my purposes, I don't want a reducible as a result. I need an iterator, so that it could be queried asynchronously.

Leonid Korogodski22:03:51

A reducible can't be passed to Akka.

isak23:03:14

How about just using prepare, then processing the rows manually? Maybe Clojure makes it too hard to control the execution precisely when using laziness, like hiredman was talking about. Could be a bug in your code or the library you are using. A similar option would be to reduce a plan for side-effects (like populating a queue or channel) if reading manually is too cumbersome.

Leonid Korogodski23:03:33

But the side-effects wouldn't wait for the consumer's capacity to meet demand. The entire point is to delegate backpressure handling to the Akka framework.

isak14:03:33

Why wouldn't they wait to meet demand?

Leonid Korogodski16:04:37

They could. But that's a lot of extra custom code, fairly complex, that I'm using an established production-ready framework in order to avoid.

hiredman23:03:56

jdbc just isn't, uh, I guess push based would be the way to descibre it, with jdbc you pull the results from the database. there have been sort attempts to make something more asynchronous but nothing that has been standardized in the jdbc interface. I believe oracle was even working on it at one point, but decided not to bother and wait for project loom