This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2017-08-07
Channels
- # bangalore-clj (2)
- # beginners (53)
- # boot (30)
- # cider (27)
- # clara (1)
- # cljs-dev (18)
- # cljsrn (16)
- # clojure (153)
- # clojure-brasil (1)
- # clojure-dusseldorf (5)
- # clojure-italy (20)
- # clojure-losangeles (3)
- # clojure-spec (4)
- # clojure-uk (177)
- # clojurescript (115)
- # component (4)
- # core-logic (1)
- # datomic (29)
- # emacs (9)
- # figwheel (2)
- # gorilla (1)
- # graphql (36)
- # hoplon (4)
- # jobs (1)
- # jobs-discuss (3)
- # juxt (2)
- # keechma (22)
- # lumo (4)
- # off-topic (1)
- # onyx (17)
- # parinfer (96)
- # protorepl (10)
- # re-frame (31)
- # reagent (14)
- # ring-swagger (17)
- # spacemacs (32)
Today's starting in the theme that the week may take...train cancelled, next train is going to get me in an hour late and two trains worth of people are going to be crushed on it with not enough seats.
As I sit on my own on a different floor to any other team member and only talk to them via lync most of the time I cannot understand why client wants to pay me more to take longer to get to work and be available less!
@jasonbell unfortunately once I get to work in the faraday cage I can't communicate with you...so you can't cheer me up! You may gather I am still available for other contracts!
There’s letters, SMS messages and, if really pushed, the Weevil Express to get food parcels to you. 🙂
yes... we had a great time. thank you @maleghast
So, anyhoo… Does anyone have any experience of using http connection pools for doing bulk, outbound http operations, for example from a Clojure app to ElasticSearch..?
I seem to be “running out of threads” when I queue up HTTP PUT operations to ElasticSearch, using a core.async channel as the buffer / queue and spandex as the ES client.
(the same number of operations doing inserts into postgreSQL, using a Hikari Connection Pool is no problem at all, and in fact I have successfully queued up thousands of insert jobs for the PG database without the app breaking a sweat)
@maleghast what is it that is "running out of threads" ? are you running out of core.async go-block threads ?
I’ll be honest @mccraigmccraig I don’t REALLY know… I am “new” to doing anything serious in Clojure and I am “new” to tracking down problems and reading stack traces, but my current guess is that I am overwhelming something with my ElasticSearch calls.
@maleghast spandex seems to have a bulk api
I am making multiple calls to an API over HTTP, but I am doing them over aleph (@dominicm helped me figure it out) and I can do hundreds of them without the app breaking a sweat now.
so you are using an aleph http client, and putting the response deferreds or values onto a core.async channel ?
I may need to expand my mind about ElastichSearch, I am doing individual PUT operations, but if I could use my channel processing to build a bulk upsert then that might solve the issue all by itself
my first thought about what would be running out of threads is the core.async go-block executor pool, in the case that you are doing some blocking operation inside a go-block
@mccraigmccraig - spandex has its own HTTP client (it might be aleph it might be http-kit it might be “home-rolled”, I haven’t checked)
@maleghast what is the stack you hit?
@mccraigmccraig - I probably am, as I am doing http i/o inside a go block.
aleph/manifold has an expanding threadpool by default
@maleghast if you are doing non-blocking http i/o inside the go-block that should be fine... it's only blocking ops which will likely cause the core.async threadpool to run out
Just tried to reset my app (reloaded repl style) and got this:
java.lang.OutOfMemoryError: unable to create new native thread
clojure.lang.ExceptionInfo: Error in component :web-server in system com.stuartsierra.component.SystemMap calling suspendable.core$eval11872$fn__11886$G__11861__11891@2b6d8c25
oh, wow - an OOM... i've seen that with a queue consumer which foolishly created a thread-per-topic - that's just too many threads being created for the vm limit, rather than a thread-pool running out
I pop a single document off the channel and send to ES, admittedly inside a go-loop
If I built a data structure of say 100 docucments and then made one http call with the client..?
no, it means that something you are doing is creating threads - i would have a look in a profiler like visualvm, or do the old SIGQUIT trick, to get a threaddump out of your vm and see what is creating those threads
there is no reason at all why a core.async or manifold pipeline should not be able to support a very large number of concurrent queries with constant threads
well yeah, I figured, since I am getting from an API with “many” (100+) HTTP calls and I am using a core.async channel to queue up hundreds, indeed thousands of individual DB INSERT operations.
ah, so you are using thread-call
from core.async - that creates a thread
do you have any buffering on your core.async pipeline ? if so, what is your buffer size ?
where are your thread-call
s being done from ?
OK, so I have a go-loop that is consuming a channel, and inside that go-loop I call (thread …) each time I process something off the channel, ‘cos the operation is I/O
(go-loop []
(let [[event stop] (alts! [rr-articles-for-es es-stop] :priority true)]
(if (not (nil? stop))
(do
(<! (thread (add-article-to-elasticsearch event)))
(println "Article added to ElasticSearch Index")
(recur)))))
here’s the function that’s being called:
(defn add-article-to-elasticsearch
"Add Right Relevance Article to Local ElasticSearch Cluster / Index"
[articlemap]
(let [c (esclient/client {:hosts [(:es-cluster-url sentiment-cfg)] :basic-auth {:user "elastic" :password "changeme"}})
index (:index articlemap)
article (:article articlemap)]
(esclient/request c {:url (str "/" index "/rr-article/" (:id article))
:method :put
:body article})))
is not add-article-to-elasticsearch
non-blocking ? you should be able to make it non-blocking if you are using spandex... and then you won't need the thread
at all
OK, this is what I am trying to figure out - I don’t know how to make the spandex call non-blocking.
you are pulling docs off the core.async channel as quickly as core.async allows, and creating a thread for each doc... which quickly uses all of your RAM for thread stacks
do spandex calls like this https://github.com/mpenet/spandex#async-requests-coreasyncpromise-chan
and use a go
block to extract the result from the channel and put that on your response channel
no thread
required
(don't use <!!
to get the values from the response channel, as that example does - <!!
is blocking - use <!
inside a go
block)
OK, will do in a little while - with no warning my wife has just handed me a bowl of soup. Apparently it’s lunchtime.
you are still getting an OOM creating threads ?
where else are you creating a thread ?
Here:
(go-loop []
(let [[event stop] (alts! [rr-articles-for-pg pg-stop] :priority true)]
(if (not (nil? stop))
(do
(<! (thread (add-article-to-postgres event database)))
(recur)))))
(go-loop []
(let [[event stop] (alts! [rr-articles-for-es es-stop] :priority true)]
(if (not (nil? stop))
(do
(<! (add-article-to-elasticsearch event))
(println "Article added to ElasticSearch Index")
(recur)))))
I’ve taken the thread-call out of the second go-loop (as you can see) and updated (add-article-to-elasticsearch …) to use request-chan
thus:
(defn add-article-to-elasticsearch
"Add Right Relevance Article to Local ElasticSearch Cluster / Index"
[articlemap]
(let [c (esclient/client {:hosts [(:es-cluster-url sentiment-cfg)] :basic-auth {:user "elastic" :password "changeme"}})
index (:index articlemap)
article (:article articlemap)]
(esclient/request-chan c {:url (str "/" index "/rr-article/" (:id article))
:method :put
:body article})))
As before, the DB inserts all happen, no problems, only 215 of the 300 documents make it into ES before things blow up.
hmm. you need to figure out what is creating all the threads - if it's not your postgresql insert, and the fact that that is completing as expected indicates it isn't, and it's not your ES upsert then it must be something else ! visual-vm or SIGQUIT can help you to get thread-dumps
I suppose it could just be the impact of doing the DB stuff and the ES stuff side by side at the same time..?
not if the ES stuff isn't creating any threads
when i had a very similar OOM a while back it turned out that a gnatsd pubsub consumer client library i was using created a thread per topic - this only came to light when my production api processes started going belly up with OOMs
Hmmm, maybe spandex is doing something funky… /me goes to look (again) at spandex source
@maleghast get a thread-dump first, or look at the threads in visual-vm... the stack-traces on individual threads or thread-names will often give you a clue as to what is creating them
a SIGQUIT thread-dump is very easy to get
sigquit is easier
kill -QUIT <pid>
visual-vm (also yourkit for $) give you a load of other useful telemetry though, so are worth getting to know
Do I run the “job” and hit refresh on the thread-dump window, or wait until it’s died and then refresh it..?
@maleghast using VisualVM, you can see the live threads in a gui
I think I screwed up - I started the profiler and the app stopped responding. I am starting over just gonna look at the threads tab
Is it possible__ that I need to kill my threads after I use them..? (and if so is there a “good” way to do that?)
Basically the “threads” tab in VisualVM is full of threads that are still running, but they can’t possibly all still be needed
Thread Dump is FULL of:
"I/O dispatcher 1697" #2111 prio=5 os_prio=31 tid=0x00007fe96fe0a800 nid=0x11ee03 runnable [0x000070008aaec000]
that doesn't look like it's from a core.async thread
- those have names like "async-thread-macro-%d"
- https://github.com/clojure/core.async/blob/f8e87e1625b1660b7f3b0aea044aad1327441741/src/main/clojure/clojure/core/async.clj#L426
uh, are you by any chance using the apache HttpClient somewhere - this issue seems to indicate that it uses threads with that name - https://github.com/Mashape/unirest-java/issues/11
hmm. on second look, that may be unrelated
I thought @maleghast had removed the call to thread
around ES?
I have, but I added “request-chan” from spandex which uses “promise-chan” under the hood…
https://github.com/mpenet/spandex/blob/def13765130648595e6d0cdaaf1b840da4813400/src/clj/qbits/spandex.clj#L69 max conn count is per-client. Makes sense that each has its own threadpool.
hmm - looks like spandex is just using the ES java REST client under the hood
definitely try only creating a single client and re-using
it looks like the ES java REST client is built on netty, so it shouldn't have any need to create extraneous threads
Of course, now I have a new problem:
edge.sentiment.worker/add-articles-to-channel/fn20 worker.clj: 20
edge.sentiment.worker/add-articles-to-channel/fn/state-machine--auto--/fnAssert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer.
clojure.core.async.impl.ioc-macros/run-state-machine-wrapped (< (.size puts) impl/MAX-QUEUE-SIZE)
ioc_macros.clj: worker.clj:
973edge.sentiment.worker/add-articles-to-channel/fn/state-machine--auto--/fn
edge.sentiment.worker/add-articles-to-channel/fn/state-machine--auto-- clojure.core.async.impl.channels.ManyToManyChannel/put! 20 worker.clj: channels.clj:
clojure.core.async.impl.channels.ManyToManyChannel/put! clojure.core.async.impl.ioc-macros/run-state-machine-wrapped ioc_macros.clj: 977 channels.clj:
152 clojure.core.async.impl.channels.ManyToManyChannel/put! ioc_macros.clj: channels.clj: 973 20 ...
edge.sentiment.worker/add-articles-to-channel/fn/state-machine--auto--/fn worker.clj: 21
... clojure.core.async.impl.ioc-macros/put! ioc_macros.clj: 992
clojure.core.async.impl.channels.ManyToManyChannel/put! channels.clj: channels.clj: 152 152
java.lang.AssertionError: Assert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer.
you can use a channel with a buffer as large as you need @maleghast , which will accept puts until its buffer is full
Weirdly, this ^^ did not happen before when I was populating just__ the DB channel…
@maleghast the consumer of the postgresql channel was using thread
wasn't it ? so it would keep up with the put!
s
buffers filling up might be a bit painful to get used to in the beginning, but it's the foundation of back-pressure and extremely useful for controlling resource usage (threads, concurrency etc) when you've gotten your head around it
I think my previous suggestion of a go thread per put!
was actually advice from golang and the original csp paper, where they recommend using that.
my confusion comes from trying to figure out what happens to the “things” that don’t make it onto the channel because the buffer is full…
if you are putting them onto a channel with a >!
inside a go
then they pause until there is space in the buffer
yeah, that might be a very simple solution
Right that doesn’t work, but I think that I know why… The jobs are not coming off the channel quickly enough, so buffering it to be smaller is not helping because the go-loop that is trying to put items onto the queue has the potential to have 1000s of things to put on that queue.
the error ^^ is telling me that it can’t put everything onto the channel that the app is trying to put on
@maleghast probably better to not spawn lots of go threads, and use a large buffer on ES worker, as it will catch up. But not so large that backpressure can't apply in.
hold on… Now that I “know” (i.e. am pretty sure) that the thread explosion was due to having many, many ES clients being created, why don’t I revert to using (thread …) to effectively throttle the I/O and see if that’s more stable?
“big” operations with hundreds of documents to be processed are having this issue regardless…
This issue is not happening with the DB inserts, where I have a pool of connections. I got the thread explosion under control by reducing the number of spandex clients to 1, but I think__ that is causing the bottleneck whereby the jobs are not clearing off the channel fast enough, because I am only giving the entire process access to a single client. so, I have a question… If there were more clients available, in a pool, would that mitigate this issue?
@maleghast you can increase the max number of connections used by spandex
Yes, I’ve tried that… It does not seem to make much difference, but I may not have increased things enough…
@dominicm - I’ve been doing some more research / reading and the error message is about the queue that things go into if the channel is buffered and full, but the channels in question are not buffered… I am wondering if the issue is with using spandex’s async-chan, that uses core.async’s promise-chan under the hood…
https://github.com/mpenet/spandex/blob/master/README.md#async-requests-callbacks Channels unnecessary
I wonder - I’ve been reading Stack Overflow - if I set a stupendously high buffer value for the ES document queue is there a chance that this will just go away as there is no chance that the buffer will fill up, so the PUT queue will never come into play..?
but raising the max-conn-per-route and mac-conn-total params for the client make things worse…
@maleghast relying on never filling it up = 💥
But upping the connections to 500 and 1000 repsectively actually made things worse.
I am considering trying the _bulk approach and instead of enqueing the docs individually I could transform the data structure and use the bulk approach exposed by spandex
@maleghast I'd queue docs individually, but use manifold to group into a minimum of X in a Yms window
I need a vector of maps, instruction => data and the _bulk approach reduces the number of calls and connections to ES dramatically…
I don’t really get__ what it does, but I will look at the Manifold docs / examples again
Okay, think I've got it. Nice little example:
(let [s (s/stream), b (s/batch 2 s)] (s/put-all! s [1 2 3]) (s/close! s) (assert (= [[1 2] [3]] (s/stream->seq b))))
Basically batch takes a stream it will read from, and returns a new stream for you to read from.
This may need me to stop for the evening and look again tomorrow, I am even more confused now…
(not that I don’t appreciate the help, I do, I just don’t seem to be able to get my head around what you are suggesting…)
@maleghast the conclusion of the method, is that it will batch your single items into either: - However many you insert in up to (e.g.) 250ms - Up to (e.g.) 100 items And you will get out a single vector with however many were put onto the channel in that itme