Fork me on GitHub
#clojure-uk
<
2017-08-07
>
agile_geek06:08:14

Today's starting in the theme that the week may take...train cancelled, next train is going to get me in an hour late and two trains worth of people are going to be crushed on it with not enough seats.

agile_geek06:08:40

As I sit on my own on a different floor to any other team member and only talk to them via lync most of the time I cannot understand why client wants to pay me more to take longer to get to work and be available less!

jasonbell06:08:04

Well we’re here to cheer you up and get you through the day. Sounds rough dude.

agile_geek06:08:40

@jasonbell unfortunately once I get to work in the faraday cage I can't communicate with you...so you can't cheer me up! You may gather I am still available for other contracts!

jasonbell06:08:02

There’s letters, SMS messages and, if really pushed, the Weevil Express to get food parcels to you. 🙂

thomas09:08:30

(back from hols)

maleghast09:08:45

Hey there @thomas - did you have a lovely time?

thomas09:08:03

yes... we had a great time. thank you @maleghast

maleghast09:08:08

Splendid! 🙂

maleghast11:08:11

So, anyhoo… Does anyone have any experience of using http connection pools for doing bulk, outbound http operations, for example from a Clojure app to ElasticSearch..?

maleghast11:08:47

I seem to be “running out of threads” when I queue up HTTP PUT operations to ElasticSearch, using a core.async channel as the buffer / queue and spandex as the ES client.

maleghast11:08:29

(the same number of operations doing inserts into postgreSQL, using a Hikari Connection Pool is no problem at all, and in fact I have successfully queued up thousands of insert jobs for the PG database without the app breaking a sweat)

mccraigmccraig11:08:56

@maleghast what is it that is "running out of threads" ? are you running out of core.async go-block threads ?

maleghast11:08:17

I’ll be honest @mccraigmccraig I don’t REALLY know… I am “new” to doing anything serious in Clojure and I am “new” to tracking down problems and reading stack traces, but my current guess is that I am overwhelming something with my ElasticSearch calls.

dominicm11:08:30

@maleghast spandex seems to have a bulk api

dominicm11:08:46

manifold has a way of doing time windowing over a stream.

maleghast11:08:05

I am making multiple calls to an API over HTTP, but I am doing them over aleph (@dominicm helped me figure it out) and I can do hundreds of them without the app breaking a sweat now.

mccraigmccraig11:08:05

so you are using an aleph http client, and putting the response deferreds or values onto a core.async channel ?

maleghast11:08:23

I may need to expand my mind about ElastichSearch, I am doing individual PUT operations, but if I could use my channel processing to build a bulk upsert then that might solve the issue all by itself

mccraigmccraig11:08:02

my first thought about what would be running out of threads is the core.async go-block executor pool, in the case that you are doing some blocking operation inside a go-block

maleghast11:08:03

@mccraigmccraig - spandex has its own HTTP client (it might be aleph it might be http-kit it might be “home-rolled”, I haven’t checked)

dominicm11:08:35

@maleghast what is the stack you hit?

maleghast11:08:39

@mccraigmccraig - I probably am, as I am doing http i/o inside a go block.

mccraigmccraig11:08:40

aleph/manifold has an expanding threadpool by default

mccraigmccraig11:08:25

@maleghast if you are doing non-blocking http i/o inside the go-block that should be fine... it's only blocking ops which will likely cause the core.async threadpool to run out

maleghast11:08:10

Just tried to reset my app (reloaded repl style) and got this:

java.lang.OutOfMemoryError: unable to create new native thread
clojure.lang.ExceptionInfo: Error in component :web-server in system com.stuartsierra.component.SystemMap calling [email protected]

mccraigmccraig11:08:25

oh, wow - an OOM... i've seen that with a queue consumer which foolishly created a thread-per-topic - that's just too many threads being created for the vm limit, rather than a thread-pool running out

maleghast11:08:14

Perhaps it does indeed mean that I need to do bulk upserts to ES in that case..?

maleghast11:08:33

Every document I have is going one at a time at the moment

maleghast11:08:07

I pop a single document off the channel and send to ES, admittedly inside a go-loop

maleghast11:08:37

If I built a data structure of say 100 docucments and then made one http call with the client..?

mccraigmccraig11:08:40

no, it means that something you are doing is creating threads - i would have a look in a profiler like visualvm, or do the old SIGQUIT trick, to get a threaddump out of your vm and see what is creating those threads

mccraigmccraig11:08:46

there is no reason at all why a core.async or manifold pipeline should not be able to support a very large number of concurrent queries with constant threads

maleghast11:08:31

well yeah, I figured, since I am getting from an API with “many” (100+) HTTP calls and I am using a core.async channel to queue up hundreds, indeed thousands of individual DB INSERT operations.

mccraigmccraig11:08:41

ah, so you are using thread-call from core.async - that creates a thread

mccraigmccraig11:08:30

do you have any buffering on your core.async pipeline ? if so, what is your buffer size ?

maleghast11:08:43

no, no buffers (at the moment)

mccraigmccraig11:08:07

where are your thread-calls being done from ?

maleghast11:08:14

OK, so I have a go-loop that is consuming a channel, and inside that go-loop I call (thread …) each time I process something off the channel, ‘cos the operation is I/O

maleghast11:08:15

hold on will paste something:

maleghast11:08:27

(go-loop []
        (let [[event stop] (alts! [rr-articles-for-es es-stop] :priority true)]
          (if (not (nil? stop))
            (do
              (<! (thread (add-article-to-elasticsearch event)))
              (println "Article added to ElasticSearch Index")
              (recur)))))

maleghast11:08:18

here’s the function that’s being called:

(defn add-article-to-elasticsearch
     "Add Right Relevance Article to Local ElasticSearch Cluster / Index"
     [articlemap]
     (let [c (esclient/client {:hosts [(:es-cluster-url sentiment-cfg)] :basic-auth {:user "elastic" :password "changeme"}})
           index (:index articlemap)
           article (:article articlemap)]
       (esclient/request c {:url (str "/" index "/rr-article/" (:id article))
                            :method :put
                            :body article})))

maleghast11:08:46

(ES is running locally in a docker container)

maleghast11:08:13

esclient is spandex

mccraigmccraig11:08:26

is not add-article-to-elasticsearch non-blocking ? you should be able to make it non-blocking if you are using spandex... and then you won't need the thread at all

maleghast11:08:57

OK, this is what I am trying to figure out - I don’t know how to make the spandex call non-blocking.

mccraigmccraig11:08:12

you are pulling docs off the core.async channel as quickly as core.async allows, and creating a thread for each doc... which quickly uses all of your RAM for thread stacks

mccraigmccraig11:08:10

and use a go block to extract the result from the channel and put that on your response channel

mccraigmccraig11:08:17

no thread required

maleghast11:08:41

Excellent, thanks - I will go look at the async example…

mccraigmccraig11:08:31

(don't use <!! to get the values from the response channel, as that example does - <!! is blocking - use <! inside a go block)

maleghast11:08:09

OK, will do in a little while - with no warning my wife has just handed me a bowl of soup. Apparently it’s lunchtime.

maleghast11:08:14

biab after soup…

maleghast12:08:07

homemade parsnip soup for the win!

maleghast12:08:57

OK, so I tried that and I am still hitting the wall…

mccraigmccraig12:08:55

you are still getting an OOM creating threads ?

mccraigmccraig12:08:02

where else are you creating a thread ?

maleghast12:08:14

Around the DB action.

maleghast12:08:48

Here:

(go-loop []
        (let [[event stop] (alts! [rr-articles-for-pg pg-stop] :priority true)]
          (if (not (nil? stop))
            (do
              (<! (thread (add-article-to-postgres event database)))
              (recur)))))
      (go-loop []
        (let [[event stop] (alts! [rr-articles-for-es es-stop] :priority true)]
          (if (not (nil? stop))
            (do
              (<! (add-article-to-elasticsearch event))
              (println "Article added to ElasticSearch Index")
              (recur)))))

maleghast12:08:41

I’ve taken the thread-call out of the second go-loop (as you can see) and updated (add-article-to-elasticsearch …) to use request-chan

maleghast12:08:00

thus:

(defn add-article-to-elasticsearch
     "Add Right Relevance Article to Local ElasticSearch Cluster / Index"
     [articlemap]
     (let [c (esclient/client {:hosts [(:es-cluster-url sentiment-cfg)] :basic-auth {:user "elastic" :password "changeme"}})
           index (:index articlemap)
           article (:article articlemap)]
       (esclient/request-chan c {:url (str "/" index "/rr-article/" (:id article))
                            :method :put
                            :body article})))

maleghast12:08:37

As before, the DB inserts all happen, no problems, only 215 of the 300 documents make it into ES before things blow up.

mccraigmccraig12:08:11

hmm. you need to figure out what is creating all the threads - if it's not your postgresql insert, and the fact that that is completing as expected indicates it isn't, and it's not your ES upsert then it must be something else ! visual-vm or SIGQUIT can help you to get thread-dumps

maleghast12:08:12

I suppose it could just be the impact of doing the DB stuff and the ES stuff side by side at the same time..?

mccraigmccraig12:08:38

not if the ES stuff isn't creating any threads

mccraigmccraig12:08:07

when i had a very similar OOM a while back it turned out that a gnatsd pubsub consumer client library i was using created a thread per topic - this only came to light when my production api processes started going belly up with OOMs

maleghast12:08:55

Hmmm, maybe spandex is doing something funky… /me goes to look (again) at spandex source

mccraigmccraig12:08:29

@maleghast get a thread-dump first, or look at the threads in visual-vm... the stack-traces on individual threads or thread-names will often give you a clue as to what is creating them

mccraigmccraig12:08:45

a SIGQUIT thread-dump is very easy to get

maleghast12:08:06

OK, I am going to need to go and get visual-vm and figure out how to use it…

mccraigmccraig12:08:20

sigquit is easier

mccraigmccraig12:08:38

kill -QUIT <pid>

mccraigmccraig12:08:21

visual-vm (also yourkit for $) give you a load of other useful telemetry though, so are worth getting to know

maleghast12:08:43

I’ve downloaded and run up VisualVM

maleghast12:08:02

I’ve found the boot process and right-clicked on it and chosen Thread Dump

maleghast12:08:30

Do I run the “job” and hit refresh on the thread-dump window, or wait until it’s died and then refresh it..?

dominicm12:08:39

@maleghast using VisualVM, you can see the live threads in a gui

maleghast12:08:36

Yeah, I found that “tab” as well…

dominicm12:08:45

Anything stand out?

maleghast13:08:15

I think I screwed up - I started the profiler and the app stopped responding. I am starting over just gonna look at the threads tab

maleghast13:08:53

OK, so the process that I kicked off spawns over 2000 threads

maleghast13:08:41

Is it possible__ that I need to kill my threads after I use them..? (and if so is there a “good” way to do that?)

maleghast13:08:10

Basically the “threads” tab in VisualVM is full of threads that are still running, but they can’t possibly all still be needed

maleghast13:08:14

Thread Dump is FULL of:

"I/O dispatcher 1697" #2111 prio=5 os_prio=31 tid=0x00007fe96fe0a800 nid=0x11ee03 runnable [0x000070008aaec000]

maleghast13:08:36

I’ve seen numbers as high as 1714 for I/O dispatcher

mccraigmccraig13:08:38

that doesn't look like it's from a core.async thread - those have names like "async-thread-macro-%d" - https://github.com/clojure/core.async/blob/f8e87e1625b1660b7f3b0aea044aad1327441741/src/main/clojure/clojure/core/async.clj#L426

maleghast13:08:31

There are some__ of those - maybe 20 or so

mccraigmccraig13:08:14

uh, are you by any chance using the apache HttpClient somewhere - this issue seems to indicate that it uses threads with that name - https://github.com/Mashape/unirest-java/issues/11

mccraigmccraig13:08:15

hmm. on second look, that may be unrelated

dominicm13:08:57

I thought @maleghast had removed the call to thread around ES?

dominicm13:08:25

however, it does appear a new client is being created for every request ^^

maleghast13:08:32

I have, but I added “request-chan” from spandex which uses “promise-chan” under the hood…

dominicm13:08:39

1 client = 1 thread?

dominicm13:08:51

I wonder if the client stores any state about the threadpool in it.

maleghast13:08:13

Well, also, I am creating the client for every request

maleghast13:08:28

perhaps I should create 1 client and re-use it..?

dominicm13:08:29

yeah, I'm thinking maybe (disclaimer: I don't use spandex)

maleghast13:08:21

I’ll try that first and see what happens…

mccraigmccraig13:08:08

hmm - looks like spandex is just using the ES java REST client under the hood

mccraigmccraig13:08:46

definitely try only creating a single client and re-using

mccraigmccraig13:08:42

it looks like the ES java REST client is built on netty, so it shouldn't have any need to create extraneous threads

maleghast13:08:33

250 threads - all operations complete, no errors or stack traces

maleghast13:08:06

now to try a much bigger query / operation and see what happens…

maleghast13:08:31

Of course, now I have a new problem:

edge.sentiment.worker/add-articles-to-channel/fn20               worker.clj:   20
edge.sentiment.worker/add-articles-to-channel/fn/state-machine--auto--/fnAssert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer.
                                         clojure.core.async.impl.ioc-macros/run-state-machine-wrapped  (< (.size puts) impl/MAX-QUEUE-SIZE)
         ioc_macros.clj:             worker.clj:
 973edge.sentiment.worker/add-articles-to-channel/fn/state-machine--auto--/fn

   edge.sentiment.worker/add-articles-to-channel/fn/state-machine--auto--                         clojure.core.async.impl.channels.ManyToManyChannel/put!  20             worker.clj:           channels.clj:
 clojure.core.async.impl.channels.ManyToManyChannel/put!             clojure.core.async.impl.ioc-macros/run-state-machine-wrapped             ioc_macros.clj:  977           channels.clj:

152                  clojure.core.async.impl.channels.ManyToManyChannel/put!           ioc_macros.clj:            channels.clj: 973   20                                                                      ...

edge.sentiment.worker/add-articles-to-channel/fn/state-machine--auto--/fn               worker.clj:   21
                                                                      ...                                    clojure.core.async.impl.ioc-macros/put!           ioc_macros.clj:                          992
                  clojure.core.async.impl.channels.ManyToManyChannel/put!             channels.clj:           channels.clj:   152 152

java.lang.AssertionError: Assert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer.

mccraigmccraig13:08:38

you can use a channel with a buffer as large as you need @maleghast , which will accept puts until its buffer is full

maleghast14:08:12

Weirdly, this ^^ did not happen before when I was populating just__ the DB channel…

mccraigmccraig14:08:40

@maleghast the consumer of the postgresql channel was using thread wasn't it ? so it would keep up with the put!s

maleghast14:08:13

Oh yeah, that’s a good point

mccraigmccraig14:08:34

buffers filling up might be a bit painful to get used to in the beginning, but it's the foundation of back-pressure and extremely useful for controlling resource usage (threads, concurrency etc) when you've gotten your head around it

dominicm14:08:28

I think my previous suggestion of a go thread per put! was actually advice from golang and the original csp paper, where they recommend using that.

dominicm14:08:49

I think they actually do buffering via a # of go threads in the csp paper

maleghast14:08:58

my confusion comes from trying to figure out what happens to the “things” that don’t make it onto the channel because the buffer is full…

mccraigmccraig14:08:50

if you are putting them onto a channel with a >! inside a go then they pause until there is space in the buffer

maleghast14:08:14

Oh, I see…

maleghast14:08:27

Well that might make for a good idea then…

mccraigmccraig14:08:18

yeah, that might be a very simple solution

maleghast14:08:27

I am going to try that now and see what happens…

maleghast14:08:57

Right that doesn’t work, but I think that I know why… The jobs are not coming off the channel quickly enough, so buffering it to be smaller is not helping because the go-loop that is trying to put items onto the queue has the potential to have 1000s of things to put on that queue.

maleghast14:08:39

the error ^^ is telling me that it can’t put everything onto the channel that the app is trying to put on

dominicm14:08:37

@maleghast probably better to not spawn lots of go threads, and use a large buffer on ES worker, as it will catch up. But not so large that backpressure can't apply in.

maleghast14:08:57

When you say a large buffer, what sort of size were you thinking?

maleghast14:08:11

hold on… Now that I “know” (i.e. am pretty sure) that the thread explosion was due to having many, many ES clients being created, why don’t I revert to using (thread …) to effectively throttle the I/O and see if that’s more stable?

maleghast15:08:27

Still have the same problem…

maleghast15:08:01

“big” operations with hundreds of documents to be processed are having this issue regardless…

maleghast16:08:42

This issue is not happening with the DB inserts, where I have a pool of connections. I got the thread explosion under control by reducing the number of spandex clients to 1, but I think__ that is causing the bottleneck whereby the jobs are not clearing off the channel fast enough, because I am only giving the entire process access to a single client. so, I have a question… If there were more clients available, in a pool, would that mitigate this issue?

dominicm18:08:59

@maleghast you can increase the max number of connections used by spandex

maleghast18:08:10

Yes, I’ve tried that… It does not seem to make much difference, but I may not have increased things enough…

maleghast18:08:38

@dominicm - I’ve been doing some more research / reading and the error message is about the queue that things go into if the channel is buffered and full, but the channels in question are not buffered… I am wondering if the issue is with using spandex’s async-chan, that uses core.async’s promise-chan under the hood…

maleghast18:08:13

My point is, how can the PUT queue fill-up if the channel has no buffer?

dominicm18:08:23

Do you care about the response particularly?

dominicm18:08:15

You could use the other async api

maleghast19:08:38

I was wondering about that…

maleghast19:08:53

What happens if I use the async call and don’t specify a callback..?

maleghast19:08:11

All the DB inserts worked, only 1 document in the ES index…

maleghast19:08:57

I wonder - I’ve been reading Stack Overflow - if I set a stupendously high buffer value for the ES document queue is there a chance that this will just go away as there is no chance that the buffer will fill up, so the PUT queue will never come into play..?

maleghast19:08:23

Hmmmmm That works better, amazingly enough…

maleghast19:08:04

but raising the max-conn-per-route and mac-conn-total params for the client make things worse…

dominicm19:08:18

What happens when you do exactly?

dominicm19:08:17

@maleghast relying on never filling it up = 💥

maleghast19:08:46

Yeah, it seems to make a slight difference if I specify 100000 as the buffer size.

maleghast19:08:08

But upping the connections to 500 and 1000 repsectively actually made things worse.

dominicm19:08:25

I mean, don't you just want to bulk load?

maleghast19:08:58

I am considering trying the _bulk approach and instead of enqueing the docs individually I could transform the data structure and use the bulk approach exposed by spandex

dominicm19:08:03

@maleghast I'd queue docs individually, but use manifold to group into a minimum of X in a Yms window

maleghast19:08:36

I need a vector of maps, instruction => data and the _bulk approach reduces the number of calls and connections to ES dramatically…

maleghast20:08:54

So, connect my channel of documents to a “batch” before pushing them to ES?

dominicm20:08:23

Something like that. Recommend playing in a repl, not overly familiar with it.

maleghast20:08:30

I don’t really get__ what it does, but I will look at the Manifold docs / examples again

dominicm20:08:44

Okay, think I've got it. Nice little example:

(let [s (s/stream), b (s/batch 2 s)] (s/put-all! s [1 2 3]) (s/close! s) (assert (= [[1 2] [3]] (s/stream->seq b))))

dominicm20:08:59

Thanks copy paste on mobile

dominicm20:08:43

Basically batch takes a stream it will read from, and returns a new stream for you to read from.

maleghast20:08:32

This may need me to stop for the evening and look again tomorrow, I am even more confused now…

maleghast20:08:09

(not that I don’t appreciate the help, I do, I just don’t seem to be able to get my head around what you are suggesting…)

dominicm20:08:25

@maleghast the conclusion of the method, is that it will batch your single items into either: - However many you insert in up to (e.g.) 250ms - Up to (e.g.) 100 items And you will get out a single vector with however many were put onto the channel in that itme

dominicm20:08:54

(let [s (s/stream), b (s/batch 2 s)]
  (s/put-all! s [1 2 3])
  (s/close! s)
  (assert (= [[1 2] [3]] (s/stream->seq b))))
Put this into a buffer/the repl & have a fiddle with it, hopefully it will become clear

maleghast20:08:04

I see, I think