This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2020-09-10
Channels
- # announcements (8)
- # aws-lambda (4)
- # babashka (60)
- # beginners (65)
- # braveandtrue (5)
- # chlorine-clover (3)
- # cider (10)
- # clj-kondo (2)
- # cljdoc (2)
- # cljfx (32)
- # cljsrn (5)
- # clojure (198)
- # clojure-berlin (10)
- # clojure-europe (22)
- # clojure-france (1)
- # clojure-losangeles (33)
- # clojure-nl (3)
- # clojure-norway (22)
- # clojure-uk (35)
- # clojurescript (12)
- # code-reviews (14)
- # core-typed (1)
- # cursive (56)
- # datomic (13)
- # depstar (7)
- # emacs (1)
- # events (1)
- # figwheel-main (3)
- # fulcro (30)
- # graphql (15)
- # helix (1)
- # jobs (5)
- # juxt (6)
- # kaocha (14)
- # klipse (2)
- # malli (12)
- # off-topic (88)
- # portal (4)
- # re-frame (3)
- # reagent (5)
- # shadow-cljs (6)
- # specter (6)
- # sql (73)
- # tools-deps (15)
- # windows (4)
@seancorfield hello! i am back with more questions about streaming.
After you added support for folding the streaming result set, we put the change in production and eventually ran into
“java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker”
This happened due to the fact that ForkJoin framework backing r/fold
has magic that detects blocking operations and adds threads to compensate. (See Compensation Threads at http://moi.vonos.net/java/forkjoin/)
Based on this quote:
One special case of blocking operations has been optimised for work-stealing threadpools: the "synchronization primitive" classes in package java.util.concurrent recognise when a thread belonging to a work-stealing threadpool is about to perform a blocking operation (eg wait on a mutex). These classes then tell the associated threadpool to create an additional "compensation thread" before the current thread blocks. The number of "active threads" in the threadpool thus remains the same.
i looked at the thread dump and found where we were running to synchronization and it turned out to be in https://github.com/clojure/clojure/blob/master/src/jvm/clojure/lang/LazySeq.java#L50 which is not easily avoidable.
So, we had to revert the changes to use r/fold.
I am now back to wanting to stream the results, and again struggling to figure out how to do that.
Note that now, my main motivation is to reduce the heap usage, i.e. i’d like the results of the query to be postprocessed, converted to transit and put on the wire continuously.
It may be possible that streaming all the way will make parallel processing not so necessary, but that remains to be seen.
The problem with plan
is that i have to reduce
over it. If i am not mistaken, even if i spin off futures to do the actual work, reduce
will not return it to the calling code until the entire results set has been traversed, which means that i would not have avoided having the entirety of the result set in memory.
Is there another jdbc.next function that i am missing?
Thanks again for you help!plan
is the only way to stream large result sets and process them as they are streamed in. That processing must be eager (since it must all happen before the connection is closed) -- otherwise you have to realize the result set into memory (which you don't want).
You mention LazySeq
but you don't say whether that was in your code or in next.jdbc
?
it was in our code. actually it’s in clojure.walk which is used by camel-snake-kebab which we are calling
Well... don't do that 🙂
The point of plan
is to lift values out of each row quickly without trying to realize rows, and process those column values, and move on to the next row.
i think i am misunderstanding what you mean by > That processing must be eager -- otherwise you have to realize the result set into memory (which you don’t want). arent those directly opposed? if understand correctly, you are requiring use of ‘reduce’ in order to prevent referecnes to resultset from escaping. However, that is also exactly what’s preventing me from starting to stream the results before streaming from the db is finished.
i can certainly build around it - access the db in separate thread, put the rows into a blockingqueue and consume it from the request thread
What does your reducing function do?
You aren't producing a data structure from the result of reduce
I hope?
the intent is to get the rows out of the db, apply business logic and return them to the caller. indeed, it is to produce a datastracture from ‘reduce’, not a scalar
That can't possibly work if the result set is large enough to warrant streaming it from the DB.
can you clarify why you say that? note that the calculations are applied and added to each individual row.
I don't know how to say it any clearer, sorry.
If a result set won't fit in memory, you have to stream it from the DB, processing it eagerly (consuming it -- not producing a data structure because that would be too big to fit in memory).
If your result set would fit in memory, you might as well just use execute!
Reducing over plan
works (with streaming) if what you produce is small enough to fit in memory.
i think we are on the same page there. however, the issue is that in my case “consuming” equals “writting to http response”; but reduce
is requiring to wait until the whole result has been read from the db.
> Reducing over `plan` works (with streaming) if what you produce is small enough to fit in memory. ah yes. that is what i am saying as well
If you need to stream the results to another process, then you can reduce
over plan
to put it on a queue that another process can consume.
If you can stream results into your HTTP response, then reduce
over plan
works there -- with the reducing function pushing each (processed) row out to the HTTP response.
(reduce (fn [_ row] (send (transformed row) to-output)) nil (jdbc/plan ...))
Otherwise put some sort of queue between the reducing function and the thing writing to the HTTP response.
hmmmm. that would require that all layers of code above the db are able to consume a reducible. in my case, i am using ring+muuntaja to convert to transit. i will check if it can consume it directly.
You need back pressure somewhere 🙂
> that would require that all layers of code above the db are able to consume a reducible I don't understand why.
When you reduce
over plan
, you're getting rid of the reducible.
if it doesnt, i have to return a reducible up to the point where it will be reduce by writing to outputstreame
> that would require that all layers of code above the db are able to consume a reducible well, normally, i can return a seq to middleware and it knows how to serialize to the outputstream
And that's why you need a queue here. You need back pressure.
> You need back pressure somewhere i imagined that a lazyseq could provide that. is there any reason why that’s not as good as a reducible?
Resource management.
Lazy sequences are the wrong thing to use when you have side effects and/or I/O.
hmmmm. i wonder why… in my mind, a reducible is closed over a connection just as much as a lazyseq would have to be.
The lazy sequence would not be.
A reduce
is guaranteed to have a start and an end -- so connections can be managed. If someone does (take 10 ...)
on a lazy sequence of 100 records, the rest is not consumed and nothing can signal the "end" of processing. This was a big problem with clojure.java.jdbc
and the result-set-fn
when folks tried to do lazy sequence processing. That's why next.jdbc
prevents that.
Does that answer your question @U5CV9L3QV?
ohhh i see. the issue is that lazy sequence hang outs until garbage collections gets too it, which is potentially way too long to hold on to a connection
Even GC'ing a lazy sequence won't signal the "end" for resource management.
I'll leave it up to you to think about that in some more depth... 🙂
What are you actually going to implement finalize()
on? We're talking about regular lazy seqs here. How would you "attach" a context specify finalization, that would be guaranteed to do the right thing? Since you have no idea how much of the result set will be consumed, you'd essentially need it attached to every part of the lazy seq, and then you run the risk of an early part being consumed and then GC'd and yet the rest of the sequence could be consumed later (requiring the connection to stay open). You can't attach it only to the end of the sequence because you don't know where it is: you would have to stream the entire result set from the DB so you could find the end of the sequence -- which is exactly what we're trying to avoid here.
While on the topic, at the risk of distracting the main conversation, can the reducible interface be used to implement a dynamic limit? Is there chunking?
I'm not sure what you mean.
(into [] (take 100) (jdbc/plan ...))
like that you mean?
I don't have the cycles right now to dig into that, sorry.
If the take
transducer short-circuits the reduce
then I believe it would stop streaming from the DB and close the connection (but if you were streaming 1,000 rows at a time, it would still have read 1,000 rows into the JDBC driver -- but it wouldn't realize them into Clojure data structures).
Looks like that would work -- the transducing take
returns (ensure-reduced result)
when it has consumed enough items.
(and of course this is kind of the whole point behind plan
and reduce
and streaming from the DB -- that's the assumed behavior, if you can get your DB to actually cooperate 🙂 )
Different DBs require different tickling in this area (and some just plain ol' don't support it).
I see. Where would I find the docs on this stuff? What's the terminology used by jdbc?
Which "stuff"? Streaming result sets?
I think streaming jdbc results
will probably get you what you need, with your specific DB added.
JDBC is a mighty finicky beast and horribly non-portable 😐
https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor looks like I need to do something with a statement. I don't suppose next.jdbc handles this for me somehow? 😀
Found https://cljdoc.org/d/seancorfield/next.jdbc/1.1.582/doc/getting-started/prepared-statements :)
You can just pass those arguments into plan
and it will pass them into the PreparedStatement
it creates.
(so you don't have to use jdbc/prepare
yourself)
"All The Options" is a good section of the docs to read from the sort of things that are possible.
Specifically https://cljdoc.org/d/seancorfield/next.jdbc/1.1.588/doc/all-the-options#statements--prepared-statements
A reduce
is guaranteed to have a start and an end -- so connections can be managed. If someone does (take 10 ...)
on a lazy sequence of 100 records, the rest is not consumed and nothing can signal the "end" of processing. This was a big problem with clojure.java.jdbc
and the result-set-fn
when folks tried to do lazy sequence processing. That's why next.jdbc
prevents that.