Fork me on GitHub
#aleph
<
2016-10-05
>
jeroenvandijk08:10:08

@dm3 [aleph "0.4.2-alpha8”] with [manifold "0.1.6-alpha1”]

jeroenvandijk08:10:48

btw, I haven’t fully deployed it, but I’m running it on one node

dm308:10:00

I looked at the code and can't see how this can happen

jeroenvandijk08:10:03

Also trying to write local tests to proof that this can happen

dm308:10:07

is this a basic (s/stream)

jeroenvandijk08:10:12

but no success so far

jeroenvandijk08:10:26

no it’s a subscription on an event bus with a complex transform

jeroenvandijk08:10:34

but it should be a stream in the end

dm308:10:42

yeah, there are different internal impls

dm308:10:07

how is the stream constructed? Do you specify an xform? How do you provide buffer size?

jeroenvandijk08:10:22

I did already find that the closing of streams works slightly different that i had expected

jeroenvandijk08:10:50

yeah I’m trying to extract those things now. It’s all over the place now

jeroenvandijk08:10:29

I’ll try to make an equivalent of what I have in production without all the clutter

dm308:10:37

I feel for you 🙂

jeroenvandijk08:10:34

The manifold.stream/transform doesn’t have a timeout one it

jeroenvandijk08:10:45

This is not the real issue i’m seeing in production, but this also doesn’t match my mental model of manifold

dm308:10:30

I think how this works

dm308:10:47

the first value always gets "taken" from the source stream at the point you connect

dm308:10:16

so, once connected, there will always be a pending-take on the source stream

dm308:10:30

after you did put-all! on the source stream before that, it created a pending-put

dm308:10:48

so the value 1 is propagated to the sink stream once you connect

dm308:10:25

which also creates a pending-put for the value 2 on the source stream (so you can think of the value as already in the stream)

dm308:10:18

I guess the timeout should have happened for the value 2

jeroenvandijk08:10:58

Yes that’s what i think should happen. It seems the :timeout option via connect does some skipping too

jeroenvandijk08:10:30

Btw I think I’ve replicated the production issue. Maybe it’s because I’m using a s/buffered-stream and not s/stream

jeroenvandijk09:10:07

maybe i’m misusing the s/buffered-stream

jeroenvandijk09:10:25

What I really want to achieve is what zach describes in his presentation here [1], a way to prevent one slow consumer to block the rest, but I haven’t found how yet [1] https://www.youtube.com/watch?v=1bNOO3xxMc0&amp;feature=youtu.be&amp;t=1887

dm309:10:54

event bus should achieve that

dm309:10:05

the subscription should drop due to a timeout

jeroenvandijk09:10:50

Yes I would have hoped that, but this never happened in production

jeroenvandijk09:10:52

And consequently new connections wouldn’t receive any messages, because, i think, the other subscribers were blocking the producer

jeroenvandijk09:10:05

The documentation on buffered-streams implies that the buffer-size shouldn't get this big i think:

(buffered-stream buffer-size)
(buffered-stream metric limit)
(buffered-stream metric limit description)
A stream which will buffer at most limit data, where the size of each message is defined by (metric message).

dm309:10:06

seems like a bug with buffered-stream.. if you use (s/stream buffer-size), the buffer-size doesn't go over the capacity

jeroenvandijk09:10:56

yeah this one is interesting too. It seems to be a very specific issue with buffered-stream as it doesn’t happen here in the same way:

(fact "Buffered stream does have a limit +1"
      (let [s (s/buffered-stream 10)]

        @(d/timeout! (s/put-all! s (range 100)) 100 :timed-out-like-expected) => :timed-out-like-expected

        (s/description s) => {:buffer-capacity 10, :buffer-size 11, :closed? false, :drained? false, :pending-puts 0, :pending-takes 0, :permanent? false, :sink? true, :source? true, :type "manifold"}))

dm309:10:01

ok, one issue is with publishing to the bus

dm309:10:15

you are not respecting the backpressure

dm309:10:19

on bus/publish!

jeroenvandijk09:10:07

What about this one?

(fact "Buffered stream goes wild in combination with another thread"
      (let [s (s/buffered-stream 10)]

        (future
          (dotimes [i 100]
            (s/put! s i)))

        (Thread/sleep 100)

        (s/description s) => {:buffer-capacity 10, :buffer-size 100, :closed? false, :drained? false, :pending-puts 0, :pending-takes 0, :permanent? false, :sink? true, :source? true, :type "manifold"}))

dm309:10:38

when calling put!, you have to respect the backpressure

dm309:10:55

I'm not sure what it should be doing here though, if you don't respect it

dm309:10:09

it's probably undefined

dm309:10:22

accumulating values in the buffer doesn't seem like the best idea

jeroenvandijk09:10:44

yes I guess it has to queue somewhere. I thought too that buffered-stream would work like a sliding-buffer and drop values

jeroenvandijk09:10:09

Do you know how to achieve this with manifold? I had something like this with core.async

jeroenvandijk09:10:41

maybe i should use try-put! instead

dm309:10:42

stream/throttle?

dm309:10:33

you can also whip up something based on d/loop which I usually do

dm309:10:43

look at the throttle impl for example

jeroenvandijk09:10:43

if i don’t want to throttle on the rate I need to have a high rate and set a max-backlog I suppose

jeroenvandijk09:10:05

thanks, I’ll have a look

dm309:10:44

would be also great if you could create an issue with the test cases

dm309:10:03

to question the behaviour that we've seen

jeroenvandijk09:10:11

Would you mark them all as issues? Or should I just go ahead and let Zach decide?

dm309:10:36

I'd just have one issue

dm309:10:55

with the behaviour of buffered streams when puts don't respect the backpressure

jeroenvandijk09:10:07

ah yeah, ok I’ll do that

jeroenvandijk09:10:26

thanks for your support 🙂

dm309:10:58

thanks 🙂

jeroenvandijk10:10:28

I think I’ll stay away from buffered-stream for now 🙂

dm310:10:08

it's the same issue

dm310:10:25

zip runs put! on every item

jeroenvandijk10:10:05

yes you are right so I guess i could listen to the backpressure via a deref of manifold.bus/publish!

jeroenvandijk10:10:33

I have replaced buffered-stream with a normal stream with a buffer and redeployed to see how it changes things

dm310:10:59

if the bus is configured with a timeout, the connection to the slow stream should be severed

dm310:10:06

but you have to manage the backpressure

jeroenvandijk12:10:21

Yeah I’ll have to think it through one more time

jeroenvandijk12:10:35

I have at least one other issue with manifold in combination with aleph. It might be just be my misunderstanding. I’m sending a stream of server side events to a http client (in this case curl). I would suspect that aleph would close this stream on disconnect, right? Yet this is not something I have observed

dm312:10:53

I don't have much experience with aleph

dm312:10:12

but I'd look for on-close in the sources

dm312:10:23

or something like that

dm312:10:52

in manifold you can only close a sink that you are writing to

jeroenvandijk12:10:31

ah thanks i’ll have a look

lmergen12:10:07

manifold streams will automatically close once all downstream sinks are closed

lmergen12:10:22

it could be that you created your streams in the wrong way, though

lmergen12:10:40

take a look at the upstream? or downstream? properties of your stream

jeroenvandijk12:10:05

ah @lmergen you are right. I think the problem is with an intermedia transform:

user=> (-> (bus/topic->subscribers event-bus) :aggregates first s/downstream)
([{:op "transducer"} << stream: {:pending-puts 2, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :source? true} >>])
user=> (-> (bus/topic->subscribers event-bus) :aggregates first s/downstream first second s/downstream)
([{:op "map"} << sink: {:type "callback"} >>])
user=> (-> (bus/topic->subscribers event-bus) :aggregates first s/downstream first second s/downstream first second s/downstream)
[<< stream: {:pending-puts 0, :drained? true, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? true, :pending-takes 0, :buffer-capacity 0, :source? true} >>]
user=> (-> (bus/topic->subscribers event-bus) :aggregates first s/downstream first second s/downstream first second s/downstream first s/downstream)
nil

jeroenvandijk12:10:27

i think i have to create a transform that has a :timeout

jeroenvandijk12:10:59

btw, the above says the final stream is closed, but the intermediate ones not

jeroenvandijk12:10:48

that said it might be are a weird mix, as there are also pending puts on the first one

dm312:10:15

ah right, always forget about that

dm312:10:22

sneaky auto close

dm312:10:53

also close signal doesn't propagate unless you put another value

jeroenvandijk12:10:12

yeah that was my first suprise indeed:

(fact "Transform doesn't close right away, but after a new put"
             (let [source (s/stream)
                   sink (s/transform (map identity) source)]
               (s/close! sink)
               (s/closed? source) => false

               (s/put! source :foo)
               (s/closed? source) => true))

lmergen12:10:46

i don’t think this is the best behavior, tbh

dm312:10:09

it's an implementation issue - not very easy to solve

jeroenvandijk12:10:40

i guess it is also an optimization as you don’t have to go up the chain, but you do it lazily down the chain? Close whatever you can close along the way

jeroenvandijk12:10:56

but it is a gotcha though

jeroenvandijk12:10:05

btw, @lmergen i see you are in Amsterdam too 🙂 Haven’t been to the meetup lately. Do you do a lot with Aleph?

lmergen12:10:41

@jeroenvandijk we’re actually hosting the meetup nowadays 🙂

lmergen12:10:50

we do quiet a lot with aleph

lmergen12:10:10

we’re heavy into the juxt ecosystem (most notably yada), which does everything with aleph

lmergen12:10:26

on top of that, all our microservices use it for http clients as well

lmergen12:10:37

and… it has its rough edges, like these

jeroenvandijk12:10:08

i saw the meetup page indeed. I have to go to the meetups again 🙂

jeroenvandijk12:10:39

I’m trying to build an Hystrix clone. It works locally, but not so much under high load it seems

jeroenvandijk12:10:19

I’m positive Aleph/Manifold can do it though if i use it right

jeroenvandijk13:10:08

hmm locally the transform really doesn’t need a timeout to close. I think the high load really does something different somewhere. Maybe i should simplify the transform and see what happens

dm313:10:56

you might be interested in looking at https://github.com/tulos/manifail - tangentially related

jeroenvandijk13:10:40

ah thanks, definitely interesting!

jeroenvandijk13:10:19

The main thing I’m after in the beginning is convenient monitoring and latency control. short-circuiting would also be interesting. I’m not planning to build a one to one hystrix clone, but I would like to make something that is compatible with their dashboard

jeroenvandijk13:10:13

In what kind of application do you use this?

dm313:10:51

retrying remote requests mostly

dm313:10:04

in various places

jeroenvandijk13:10:13

it has a nice and compact code base

dm313:10:45

only handles retries though

jeroenvandijk13:10:48

yeah I’m working on a realtime bidding system. The remote services we use are normally reliable, but getting latency perfectly under control is harder. We have something that works, but could be better. I would like to apply semaphores and timeouts to get this better. A hystrix kind of abstraction in idiomatic clojure would be nice i think

dm313:10:18

hehe, aren't you scared of GC pauses all over the place?

jeroenvandijk13:10:53

you mean because of the Hystrix approach or just in general?

jeroenvandijk13:10:20

it is realtime bidding in the advertising domain so you still have 100ms or more per request, not too bad

jeroenvandijk13:10:53

it’s not bidding on a money exchange where it is submillisecond

dm313:10:36

yeah, you should be able to keep under that limit. However you are destined to have a fun time reducing allocations in Clojure code at some point 🙂

jeroenvandijk13:10:05

it has been live for a few years so nothing too bad 😎

jeroenvandijk13:10:44

i’m profiling my newly deployed code now, and i see clojure.lang.LockingTransaction$RetryEx coming up. Never seen that before. Does it ring a bell?

dm313:10:47

ah, ok then 🙂

dm313:10:24

conflicts in swap!?

jeroenvandijk13:10:03

yeah must something like that, but never seen it before so must have something to do with the new code. So i guess nothing manifold specific

jeroenvandijk13:10:50

I also noted that there are some reflection issues with Manifold. I get quite some warnings

jeroenvandijk13:10:06

The reflector is also scoring high in the profiling list

jeroenvandijk13:10:00

Ah the downside of debugging/profiling in a live system… something else is causing the hiccup

dm313:10:01

I'd be surprised

jeroenvandijk13:10:38

I think it is clojure.pprint/pprint printing a lot of errors at the moment...

jeroenvandijk13:10:14

or better the agent that is calling pprint, agents have a lockingtransaction apparently

jeroenvandijk14:10:42

I have something working. There was one other issue where I didn’t add the newlines and so the SSE endpoint wouldn’t flush the events

jeroenvandijk14:10:48

thanks for the help!

lmergen15:10:23

@jeroenvandijk interesting you’re working on a hystrix clone! i was actually thinking about doing some work in this area as well, but am more looking for a higher-level “microservice client”, like what you could imagine the client-side counterpart of Yada to be

jeroenvandijk15:10:56

Cool 🙂 I can imagine it doesn’t matter much if it is on the server or on a client. My first approach was with core.async and I actually thought of creating a browser demo. But core.async is harder to wrap than manifold due the dependency of macros. I’m not sure what needs to happen to make something work on a browser client if that’s what you mean

lmergen15:10:26

true, but if you want to implement hystrix features like bulkheads, things start to become a bit more difficult with manifold i think

jeroenvandijk15:10:31

you mean bulk head requests? or is it that a term i don’t know?

jeroenvandijk15:10:04

never mind i see it is in hystrix

jeroenvandijk15:10:57

Yeah I’m not after complete thread isolation i guess

jeroenvandijk15:10:14

I would like to use semaphores to control the concurrency

jeroenvandijk15:10:37

that is actually pretty easy to implement with manifold

jeroenvandijk15:10:21

Maybe thread isolation would be possible with setting specific executors

jeroenvandijk15:10:51

I’m not a manifold expert either, nor hystrix for that matter. So maybe I’m underestimating the full problem

jeroenvandijk15:10:08

The first version i have in mind is simple enough i think

jeroenvandijk15:10:20

If i get this reporting to work properly that is 🙂

jeroenvandijk15:10:33

I can give you a preview if you are interested in the semaphore implementation

jeroenvandijk15:10:31

Maybe I’m missing something. Why do you think bulkheads would be hard to implement with manifold?

jeroenvandijk15:10:43

When I read this http://stackoverflow.com/questions/30391809/what-is-bulkhead-pattern-used-by-hystrix I’m guessing that a semaphore (control the number of concurrent requests) and an executor with a maximum of threads would be equivalent

lmergen15:10:58

it’s important to realise that the problem that’s being solved is that of thread starvation in one place

lmergen15:10:36

if you do not allocate at least one thread to each “component”, you still face the problem that all your threads might be hanging in another place

lmergen15:10:51

so this would not solve the deadlocking issue that is solved with bulkheads

jeroenvandijk15:10:24

Ok I think i’m just unfamiliar with the particular use case or I haven’t faced the issue yet. Do you have an example somewhere of an event like this?

jeroenvandijk15:10:49

For example, the stackoverflow says that a semaphore approach cannot timeout requests and a thread approach can. I don’t think i understand why the semaphore approach wouldn’t allow that

jeroenvandijk16:10:46

In previous efforts I found that a thread can also not be cleaned up cleanly in all cases. Which means that if your underlying code is faulty it doesn’t really matter how you wrap it. I hope to be proven wrong though

lmergen16:10:31

Consider the case where all your threads are blocked in some HTTP request. Indirectly, however, that request is issueing another request on your server, and is actually blocked on that

lmergen16:10:59

Then you have a deadlock purely because there is no thread available to handle that other request

jeroenvandijk17:10:16

hmm i think i see what you mean. So that extra thread would keep an eye on timeouts I guess? I mean without timeouts this situation would be unsolvable, right? I'll think about it a bit more. I guess my application is a lot more simple and doesn’t have these deadlock situation AFAIK

lmergen17:10:22

Well it's meant to solve a specific type of failure scenario. It is comparable to a deadlock

lmergen17:10:32

However, it is distributed

lmergen17:10:07

I do want to say that using a semaphore does not solve the issue we're describing here, unless your total number of threads is more than your all your semaphore limits together

jeroenvandijk17:10:13

Hmm yeah i think that’s true. The thing i’m missing is how this would be 100% prevented with a thread based approach. If you have a threadpool you have the same issue, right? Only being able to create a new thread would circumvent this I guess

jeroenvandijk17:10:19

But I don’t think I have this deadlock scenario in my application. Maybe i’m ignorant or maybe I just work on a less complex system

jeroenvandijk17:10:42

I would love the chat about some more later as you do seem to have this problem. Maybe can also help validating the hystrix clone 🙂

lmergen17:10:34

Well you can always come by if you're in Amsterdam. These things are fascinating and difficult to get right.

lmergen17:10:58

that book, Release It! is where bulkheads, circuit breakers etc are first described, btw

jeroenvandijk17:10:40

Yeah I guess it’s time to start reading it and stop being ignorant 😬

jeroenvandijk17:10:19

I’ll visit the next Clojure meetup, I see it’s next wednesday. Maybe we can meet there too

jeroenvandijk17:10:39

Have to run. Will post new results/questions here later 🙂

jeroenvandijk17:10:50

Thanks for all the help

dm317:10:28

it seems in general bulkheads are hard to achieve on the JVM

dm317:10:11

if you have a runtime like Erlang's, your process just dies and restarts fresh

dm317:10:20

independent of others

lmergen17:10:58

@dm3: so you could pretty much say that bulkheads are necessary because of the JVM

lmergen17:10:42

basically, bulkheads are indirectly part of erlang's actor architecture, which in turn is stolen from CSP

lmergen17:10:03

in other words, people have once again already figured all this out in the 60s :)

dm317:10:22

I think it's a property of runtime, no?

lmergen17:10:36

@jeroenvandijk: would be awesome, I'll probably be there!

lmergen17:10:56

@dm3: it's a property of the concurrency model

dm317:10:36

hm, so you are saying core.async enables easy bulkheads? I uess I don't really understand what you mean by those

lmergen17:10:59

maybe, i'm not very familiar with core.async, but as far as I'm aware it shares a thread pool between actors, right ?

lmergen17:10:27

anyway, you pretty much just want one threadpool (or thread) per actor

dm317:10:19

well, core.async is an implementation of CSP ideas

dm317:10:01

I guess a programming model + a proper runtime makes a robust system

dm317:10:19

you have natural bulkheads and the ability to fail fast

dm317:10:37

in erlang

lmergen17:10:49

yes, but if focuses mostly on abstractions of CSP

lmergen17:10:28

it's not pure CSP, probably because of the ease of use