This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2016-10-05
Channels
- # aleph (190)
- # bangalore-clj (4)
- # beginners (31)
- # boot (127)
- # braid-chat (2)
- # cider (2)
- # cljs-dev (79)
- # cljsrn (7)
- # clojure (81)
- # clojure-dev (1)
- # clojure-greece (40)
- # clojure-italy (3)
- # clojure-korea (8)
- # clojure-new-zealand (5)
- # clojure-russia (5)
- # clojure-spec (87)
- # clojure-uk (13)
- # clojurescript (50)
- # cloverage (10)
- # component (4)
- # core-async (37)
- # cursive (26)
- # datascript (20)
- # datomic (29)
- # editors (2)
- # emacs (12)
- # hoplon (63)
- # jobs (2)
- # lein-figwheel (1)
- # leiningen (17)
- # liberator (2)
- # off-topic (19)
- # om (31)
- # onyx (9)
- # pedestal (4)
- # proton (1)
- # re-frame (22)
- # reagent (13)
- # ring (1)
- # ring-swagger (9)
- # spacemacs (5)
- # specter (4)
- # untangled (24)
- # vim (29)
@dm3 [aleph "0.4.2-alpha8”]
with [manifold "0.1.6-alpha1”]
btw, I haven’t fully deployed it, but I’m running it on one node
Also trying to write local tests to proof that this can happen
but no success so far
no it’s a subscription on an event bus with a complex transform
but it should be a stream in the end
I did already find that the closing of streams works slightly different that i had expected
yeah I’m trying to extract those things now. It’s all over the place now
I’ll try to make an equivalent of what I have in production without all the clutter
haha thanks
The manifold.stream/transform
doesn’t have a timeout one it
This is not the real issue i’m seeing in production, but this also doesn’t match my mental model of manifold
which also creates a pending-put
for the value 2
on the source stream (so you can think of the value as already in the stream)
Yes that’s what i think should happen. It seems the :timeout
option via connect does some skipping too
Btw I think I’ve replicated the production issue. Maybe it’s because I’m using a s/buffered-stream
and not s/stream
maybe i’m misusing the s/buffered-stream
What I really want to achieve is what zach describes in his presentation here [1], a way to prevent one slow consumer to block the rest, but I haven’t found how yet [1] https://www.youtube.com/watch?v=1bNOO3xxMc0&feature=youtu.be&t=1887
Yes I would have hoped that, but this never happened in production
And consequently new connections wouldn’t receive any messages, because, i think, the other subscribers were blocking the producer
The documentation on buffered-streams
implies that the buffer-size
shouldn't get this big i think:
(buffered-stream buffer-size)
(buffered-stream metric limit)
(buffered-stream metric limit description)
A stream which will buffer at most limit data, where the size of each message is defined by (metric message).
seems like a bug with buffered-stream.. if you use (s/stream buffer-size)
, the buffer-size doesn't go over the capacity
yeah this one is interesting too. It seems to be a very specific issue with buffered-stream as it doesn’t happen here in the same way:
(fact "Buffered stream does have a limit +1"
(let [s (s/buffered-stream 10)]
@(d/timeout! (s/put-all! s (range 100)) 100 :timed-out-like-expected) => :timed-out-like-expected
(s/description s) => {:buffer-capacity 10, :buffer-size 11, :closed? false, :drained? false, :pending-puts 0, :pending-takes 0, :permanent? false, :sink? true, :source? true, :type "manifold"}))
What about this one?
(fact "Buffered stream goes wild in combination with another thread"
(let [s (s/buffered-stream 10)]
(future
(dotimes [i 100]
(s/put! s i)))
(Thread/sleep 100)
(s/description s) => {:buffer-capacity 10, :buffer-size 100, :closed? false, :drained? false, :pending-puts 0, :pending-takes 0, :permanent? false, :sink? true, :source? true, :type "manifold"}))
yes I guess it has to queue somewhere. I thought too that buffered-stream would work like a sliding-buffer and drop values
Do you know how to achieve this with manifold? I had something like this with core.async
maybe i should use try-put! instead
if i don’t want to throttle on the rate I need to have a high rate and set a max-backlog I suppose
thanks, I’ll have a look
Would you mark them all as issues? Or should I just go ahead and let Zach decide?
ah yeah, ok I’ll do that
thanks for your support 🙂
I think I’ll stay away from buffered-stream for now 🙂
yes you are right so I guess i could listen to the backpressure via a deref of manifold.bus/publish!
I have replaced buffered-stream with a normal stream with a buffer and redeployed to see how it changes things
if the bus is configured with a timeout, the connection to the slow stream should be severed
Yeah I’ll have to think it through one more time
I have at least one other issue with manifold in combination with aleph. It might be just be my misunderstanding. I’m sending a stream of server side events to a http client (in this case curl). I would suspect that aleph would close this stream on disconnect, right? Yet this is not something I have observed
ah thanks i’ll have a look
ah @lmergen you are right. I think the problem is with an intermedia transform:
user=> (-> (bus/topic->subscribers event-bus) :aggregates first s/downstream)
([{:op "transducer"} << stream: {:pending-puts 2, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :source? true} >>])
user=> (-> (bus/topic->subscribers event-bus) :aggregates first s/downstream first second s/downstream)
([{:op "map"} << sink: {:type "callback"} >>])
user=> (-> (bus/topic->subscribers event-bus) :aggregates first s/downstream first second s/downstream first second s/downstream)
[<< stream: {:pending-puts 0, :drained? true, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? true, :pending-takes 0, :buffer-capacity 0, :source? true} >>]
user=> (-> (bus/topic->subscribers event-bus) :aggregates first s/downstream first second s/downstream first second s/downstream first s/downstream)
nil
i think i have to create a transform that has a :timeout
btw, the above says the final stream is closed, but the intermediate ones not
that said it might be are a weird mix, as there are also pending puts on the first one
yeah that was my first suprise indeed:
(fact "Transform doesn't close right away, but after a new put"
(let [source (s/stream)
sink (s/transform (map identity) source)]
(s/close! sink)
(s/closed? source) => false
(s/put! source :foo)
(s/closed? source) => true))
i guess it is also an optimization as you don’t have to go up the chain, but you do it lazily down the chain? Close whatever you can close along the way
but it is a gotcha though
btw, @lmergen i see you are in Amsterdam too 🙂 Haven’t been to the meetup lately. Do you do a lot with Aleph?
@jeroenvandijk we’re actually hosting the meetup nowadays 🙂
we’re heavy into the juxt ecosystem (most notably yada), which does everything with aleph
ah nice
i saw the meetup page indeed. I have to go to the meetups again 🙂
I’m trying to build an Hystrix clone. It works locally, but not so much under high load it seems
I’m positive Aleph/Manifold can do it though if i use it right
hmm locally the transform really doesn’t need a timeout to close. I think the high load really does something different somewhere. Maybe i should simplify the transform and see what happens
you might be interested in looking at https://github.com/tulos/manifail - tangentially related
ah thanks, definitely interesting!
The main thing I’m after in the beginning is convenient monitoring and latency control. short-circuiting would also be interesting. I’m not planning to build a one to one hystrix clone, but I would like to make something that is compatible with their dashboard
In what kind of application do you use this?
it has a nice and compact code base
yeah I’m working on a realtime bidding system. The remote services we use are normally reliable, but getting latency perfectly under control is harder. We have something that works, but could be better. I would like to apply semaphores and timeouts to get this better. A hystrix kind of abstraction in idiomatic clojure would be nice i think
you mean because of the Hystrix approach or just in general?
it is realtime bidding in the advertising domain so you still have 100ms or more per request, not too bad
it’s not bidding on a money exchange where it is submillisecond
yeah, you should be able to keep under that limit. However you are destined to have a fun time reducing allocations in Clojure code at some point 🙂
it has been live for a few years so nothing too bad 😎
i’m profiling my newly deployed code now, and i see clojure.lang.LockingTransaction$RetryEx coming up. Never seen that before. Does it ring a bell?
yeah must something like that, but never seen it before so must have something to do with the new code. So i guess nothing manifold specific
I also noted that there are some reflection issues with Manifold. I get quite some warnings
The reflector is also scoring high in the profiling list
Ah the downside of debugging/profiling in a live system… something else is causing the hiccup
I think it is clojure.pprint/pprint
printing a lot of errors at the moment...
or better the agent that is calling pprint, agents have a lockingtransaction apparently
I have something working. There was one other issue where I didn’t add the newlines and so the SSE endpoint wouldn’t flush the events
thanks for the help!
@jeroenvandijk interesting you’re working on a hystrix clone! i was actually thinking about doing some work in this area as well, but am more looking for a higher-level “microservice client”, like what you could imagine the client-side counterpart of Yada to be
Cool 🙂 I can imagine it doesn’t matter much if it is on the server or on a client. My first approach was with core.async and I actually thought of creating a browser demo. But core.async is harder to wrap than manifold due the dependency of macros. I’m not sure what needs to happen to make something work on a browser client if that’s what you mean
true, but if you want to implement hystrix features like bulkheads, things start to become a bit more difficult with manifold i think
you mean bulk head requests? or is it that a term i don’t know?
never mind i see it is in hystrix
Yeah I’m not after complete thread isolation i guess
I would like to use semaphores to control the concurrency
that is actually pretty easy to implement with manifold
Maybe thread isolation would be possible with setting specific executors
I’m not a manifold expert either, nor hystrix for that matter. So maybe I’m underestimating the full problem
The first version i have in mind is simple enough i think
If i get this reporting to work properly that is 🙂
I can give you a preview if you are interested in the semaphore implementation
Maybe I’m missing something. Why do you think bulkheads would be hard to implement with manifold?
When I read this http://stackoverflow.com/questions/30391809/what-is-bulkhead-pattern-used-by-hystrix I’m guessing that a semaphore (control the number of concurrent requests) and an executor with a maximum of threads would be equivalent
it’s important to realise that the problem that’s being solved is that of thread starvation in one place
if you do not allocate at least one thread to each “component”, you still face the problem that all your threads might be hanging in another place
Ok I think i’m just unfamiliar with the particular use case or I haven’t faced the issue yet. Do you have an example somewhere of an event like this?
For example, the stackoverflow says that a semaphore approach cannot timeout requests and a thread approach can. I don’t think i understand why the semaphore approach wouldn’t allow that
In previous efforts I found that a thread can also not be cleaned up cleanly in all cases. Which means that if your underlying code is faulty it doesn’t really matter how you wrap it. I hope to be proven wrong though
Consider the case where all your threads are blocked in some HTTP request. Indirectly, however, that request is issueing another request on your server, and is actually blocked on that
Then you have a deadlock purely because there is no thread available to handle that other request
hmm i think i see what you mean. So that extra thread would keep an eye on timeouts I guess? I mean without timeouts this situation would be unsolvable, right? I'll think about it a bit more. I guess my application is a lot more simple and doesn’t have these deadlock situation AFAIK
Well it's meant to solve a specific type of failure scenario. It is comparable to a deadlock
I do want to say that using a semaphore does not solve the issue we're describing here, unless your total number of threads is more than your all your semaphore limits together
Hmm yeah i think that’s true. The thing i’m missing is how this would be 100% prevented with a thread based approach. If you have a threadpool you have the same issue, right? Only being able to create a new thread would circumvent this I guess
But I don’t think I have this deadlock scenario in my application. Maybe i’m ignorant or maybe I just work on a less complex system
I would love the chat about some more later as you do seem to have this problem. Maybe can also help validating the hystrix clone 🙂
Well you can always come by if you're in Amsterdam. These things are fascinating and difficult to get right.
https://johnragan.wordpress.com/2009/12/08/release-it-stability-patterns-and-best-practices/
that book, Release It! is where bulkheads, circuit breakers etc are first described, btw
Yeah I guess it’s time to start reading it and stop being ignorant 😬
I’ll visit the next Clojure meetup, I see it’s next wednesday. Maybe we can meet there too
Have to run. Will post new results/questions here later 🙂
Thanks for all the help
basically, bulkheads are indirectly part of erlang's actor architecture, which in turn is stolen from CSP
@jeroenvandijk: would be awesome, I'll probably be there!
hm, so you are saying core.async enables easy bulkheads? I uess I don't really understand what you mean by those