Fork me on GitHub
#sql
<
2020-09-10
>
antonmos20:09:52

@seancorfield hello! i am back with more questions about streaming. After you added support for folding the streaming result set, we put the change in production and eventually ran into “java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker” This happened due to the fact that ForkJoin framework backing r/fold has magic that detects blocking operations and adds threads to compensate. (See Compensation Threads at http://moi.vonos.net/java/forkjoin/) Based on this quote:

One special case of blocking operations has been optimised for work-stealing threadpools: the "synchronization primitive" classes in package java.util.concurrent recognise when a thread belonging to a work-stealing threadpool is about to perform a blocking operation (eg wait on a mutex). These classes then tell the associated threadpool to create an additional "compensation thread" before the current thread blocks. The number of "active threads" in the threadpool thus remains the same.
i looked at the thread dump and found where we were running to synchronization and it turned out to be in https://github.com/clojure/clojure/blob/master/src/jvm/clojure/lang/LazySeq.java#L50 which is not easily avoidable. So, we had to revert the changes to use r/fold. I am now back to wanting to stream the results, and again struggling to figure out how to do that. Note that now, my main motivation is to reduce the heap usage, i.e. i’d like the results of the query to be postprocessed, converted to transit and put on the wire continuously. It may be possible that streaming all the way will make parallel processing not so necessary, but that remains to be seen. The problem with plan is that i have to reduce over it. If i am not mistaken, even if i spin off futures to do the actual work, reduce will not return it to the calling code until the entire results set has been traversed, which means that i would not have avoided having the entirety of the result set in memory. Is there another jdbc.next function that i am missing? Thanks again for you help!

seancorfield20:09:50

plan is the only way to stream large result sets and process them as they are streamed in. That processing must be eager (since it must all happen before the connection is closed) -- otherwise you have to realize the result set into memory (which you don't want).

seancorfield20:09:17

You mention LazySeq but you don't say whether that was in your code or in next.jdbc?

antonmos20:09:53

it was in our code. actually it’s in clojure.walk which is used by camel-snake-kebab which we are calling

seancorfield21:09:08

Well... don't do that 🙂

seancorfield21:09:31

The point of plan is to lift values out of each row quickly without trying to realize rows, and process those column values, and move on to the next row.

antonmos21:09:49

i think i am misunderstanding what you mean by > That processing must be eager -- otherwise you have to realize the result set into memory (which you don’t want). arent those directly opposed? if understand correctly, you are requiring use of ‘reduce’ in order to prevent referecnes to resultset from escaping. However, that is also exactly what’s preventing me from starting to stream the results before streaming from the db is finished.

antonmos21:09:37

i can certainly build around it - access the db in separate thread, put the rows into a blockingqueue and consume it from the request thread

antonmos21:09:05

but it seems like i’d be wrestling around the limitation imposed by jdbc.next

seancorfield21:09:51

What does your reducing function do?

seancorfield21:09:27

You aren't producing a data structure from the result of reduce I hope?

antonmos21:09:36

does a bunch of cpu-bound calculations on the rows and adds the results to the rows

antonmos21:09:24

the intent is to get the rows out of the db, apply business logic and return them to the caller. indeed, it is to produce a datastracture from ‘reduce’, not a scalar

seancorfield21:09:56

That can't possibly work if the result set is large enough to warrant streaming it from the DB.

antonmos21:09:04

can you clarify why you say that? note that the calculations are applied and added to each individual row.

seancorfield21:09:30

I don't know how to say it any clearer, sorry.

seancorfield21:09:32

If a result set won't fit in memory, you have to stream it from the DB, processing it eagerly (consuming it -- not producing a data structure because that would be too big to fit in memory).

seancorfield21:09:49

If your result set would fit in memory, you might as well just use execute!

seancorfield21:09:24

Reducing over plan works (with streaming) if what you produce is small enough to fit in memory.

antonmos21:09:44

i think we are on the same page there. however, the issue is that in my case “consuming” equals “writting to http response”; but reduce is requiring to wait until the whole result has been read from the db.

antonmos21:09:00

> Reducing over `plan` works (with streaming) if what you produce is small enough to fit in memory. ah yes. that is what i am saying as well

seancorfield21:09:04

If you need to stream the results to another process, then you can reduce over plan to put it on a queue that another process can consume.

seancorfield21:09:59

If you can stream results into your HTTP response, then reduce over plan works there -- with the reducing function pushing each (processed) row out to the HTTP response.

seancorfield21:09:31

(reduce (fn [_ row] (send (transformed row) to-output)) nil (jdbc/plan ...))

seancorfield21:09:32

Otherwise put some sort of queue between the reducing function and the thing writing to the HTTP response.

antonmos21:09:52

hmmmm. that would require that all layers of code above the db are able to consume a reducible. in my case, i am using ring+muuntaja to convert to transit. i will check if it can consume it directly.

seancorfield21:09:08

You need back pressure somewhere 🙂

seancorfield21:09:15

> that would require that all layers of code above the db are able to consume a reducible I don't understand why.

seancorfield21:09:37

When you reduce over plan, you're getting rid of the reducible.

antonmos21:09:54

that works only the result of reduce fits into memory

antonmos21:09:45

if it doesnt, i have to return a reducible up to the point where it will be reduce by writing to outputstreame

antonmos21:09:38

> that would require that all layers of code above the db are able to consume a reducible well, normally, i can return a seq to middleware and it knows how to serialize to the outputstream

seancorfield21:09:08

And that's why you need a queue here. You need back pressure.

antonmos21:09:31

> You need back pressure somewhere i imagined that a lazyseq could provide that. is there any reason why that’s not as good as a reducible?

seancorfield21:09:53

Resource management.

seancorfield21:09:16

Lazy sequences are the wrong thing to use when you have side effects and/or I/O.

antonmos21:09:57

hmmmm. i wonder why… in my mind, a reducible is closed over a connection just as much as a lazyseq would have to be.

seancorfield21:09:15

The lazy sequence would not be.

antonmos21:09:32

why not? (i believe you, i am just trying to learn here)

seancorfield21:09:04

A reduce is guaranteed to have a start and an end -- so connections can be managed. If someone does (take 10 ...) on a lazy sequence of 100 records, the rest is not consumed and nothing can signal the "end" of processing. This was a big problem with clojure.java.jdbc and the result-set-fn when folks tried to do lazy sequence processing. That's why next.jdbc prevents that.

seancorfield21:09:27

Does that answer your question @U5CV9L3QV?

antonmos22:09:29

ohhh i see. the issue is that lazy sequence hang outs until garbage collections gets too it, which is potentially way too long to hold on to a connection

antonmos22:09:40

thank you for clarifying that! i will try using plan or fold with a queue.

seancorfield22:09:43

Even GC'ing a lazy sequence won't signal the "end" for resource management.

antonmos22:09:34

in theory, couldnt you implement finalize() to make it release the connection?

seancorfield22:09:02

I'll leave it up to you to think about that in some more depth... 🙂

antonmos14:09:35

mmmmm, nope still not getting it.

seancorfield16:09:24

What are you actually going to implement finalize() on? We're talking about regular lazy seqs here. How would you "attach" a context specify finalization, that would be guaranteed to do the right thing? Since you have no idea how much of the result set will be consumed, you'd essentially need it attached to every part of the lazy seq, and then you run the risk of an early part being consumed and then GC'd and yet the rest of the sequence could be consumed later (requiring the connection to stay open). You can't attach it only to the end of the sequence because you don't know where it is: you would have to stream the entire result set from the DB so you could find the end of the sequence -- which is exactly what we're trying to avoid here.

antonmos02:09:11

oh i was thinking that we’d have to extend or implement our own lazyseq for this.

dominicm21:09:07

While on the topic, at the risk of distracting the main conversation, can the reducible interface be used to implement a dynamic limit? Is there chunking?

seancorfield21:09:58

I'm not sure what you mean.

dominicm21:09:55

I mean SQL limit, or take-while, that kind of thing.

seancorfield21:09:40

(into [] (take 100) (jdbc/plan ...)) like that you mean?

dominicm21:09:13

Yeah. That would be an example.

dominicm21:09:31

Will it blow through memory if theres a bajillion rows in the result?

seancorfield21:09:24

I don't have the cycles right now to dig into that, sorry.

seancorfield21:09:07

If the take transducer short-circuits the reduce then I believe it would stop streaming from the DB and close the connection (but if you were streaming 1,000 rows at a time, it would still have read 1,000 rows into the JDBC driver -- but it wouldn't realize them into Clojure data structures).

seancorfield21:09:15

Looks like that would work -- the transducing take returns (ensure-reduced result) when it has consumed enough items.

dominicm21:09:34

Cool. No rush on this. Future planning. I don't suppose chunking can be controlled?

dominicm21:09:50

Thanks for figuring it out despite!

seancorfield21:09:03

(and of course this is kind of the whole point behind plan and reduce and streaming from the DB -- that's the assumed behavior, if you can get your DB to actually cooperate 🙂 )

seancorfield21:09:26

Different DBs require different tickling in this area (and some just plain ol' don't support it).

dominicm21:09:25

I see. Where would I find the docs on this stuff? What's the terminology used by jdbc?

seancorfield21:09:50

Which "stuff"? Streaming result sets?

dominicm21:09:19

Yeah, although that might be it! :)

seancorfield21:09:43

I think streaming jdbc results will probably get you what you need, with your specific DB added.

seancorfield21:09:06

JDBC is a mighty finicky beast and horribly non-portable 😐

dominicm21:09:33

https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor looks like I need to do something with a statement. I don't suppose next.jdbc handles this for me somehow? 😀

seancorfield22:09:59

You can just pass those arguments into plan and it will pass them into the PreparedStatement it creates.

seancorfield22:09:24

(so you don't have to use jdbc/prepare yourself)

seancorfield22:09:50

"All The Options" is a good section of the docs to read from the sort of things that are possible.

seancorfield21:09:04

A reduce is guaranteed to have a start and an end -- so connections can be managed. If someone does (take 10 ...) on a lazy sequence of 100 records, the rest is not consumed and nothing can signal the "end" of processing. This was a big problem with clojure.java.jdbc and the result-set-fn when folks tried to do lazy sequence processing. That's why next.jdbc prevents that.