This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2023-01-06
Channels
- # aleph (13)
- # announcements (1)
- # babashka (89)
- # beginners (23)
- # calva (14)
- # circleci (7)
- # clj-kondo (39)
- # clj-on-windows (1)
- # cljdoc (5)
- # cljsrn (29)
- # clojure (98)
- # clojure-art (3)
- # clojure-conj (5)
- # clojure-europe (14)
- # clojure-nl (1)
- # clojure-norway (9)
- # clojurescript (18)
- # clr (39)
- # code-art (3)
- # community-development (3)
- # cursive (3)
- # emacs (11)
- # events (1)
- # fulcro (12)
- # graalvm-mobile (16)
- # graphql (3)
- # gratitude (1)
- # honeysql (19)
- # java (7)
- # joyride (23)
- # lsp (22)
- # malli (2)
- # missionary (25)
- # off-topic (15)
- # polylith (15)
- # rdf (5)
- # reagent (9)
- # reitit (3)
- # scittle (3)
- # shadow-cljs (37)
- # slack-help (2)
- # sql (10)
Hi everyone 👋 I am using aleph to handle SSE on the server side and I am not sure that the approach I have used is ideal, so I am looking for help. The code is structured like this: • For each "room" I create a source stream and put it in atom hashmap (key room id, value the stream) • For each user connecting to a room I get the source stream, create a sink stream and connect it to the source ◦ and return the sink stream in the response of the SSE endpoint so that I can send events to users • When I need to broadcast messages I send them to the source stream which then forwards to the users • Once user disconnects, their sink stream is marked as drained, so periodically I look at all the source stream and, using s/downstream to get all connected stream, if all downstreams are drained I close the source stream and remove it from the atom Questions: • are there better ways to broadcast messages to users in a room? • is there a better way to handle user disconnecting and cleanup an empty room? Is the current approach leaking resources? • I was thinking about using an event-bus with a topic for each room and user subscriptions to that topic? How does it scale? Does it automatically cleanup disconnected users? Sorry for the long post, and thanks in advance for your help! 🙏
Hello Francesco! Do you have some code to share based on the approach you describe? Otherwise, I will give you an answer based on your message. 🙂
Hi Arnaud! Thanks for jumping into my question so quickly! 🙂 Here is the code of the whole application https://github.com/fpischedda/unrefined The state atom is handled https://github.com/fpischedda/unrefined/blob/main/src/clj/fpsd/unrefined/state.clj#L9. User connection is handled https://github.com/fpischedda/unrefined/blob/main/src/clj/fpsd/refinements/handlers.clj#L13 on the ring side and https://github.com/fpischedda/unrefined/blob/main/src/clj/fpsd/refinements/events.clj#L13 I do the connection between source and user sink. And finally https://github.com/fpischedda/unrefined/blob/main/src/clj/fpsd/unrefined/mr_clean.clj is the cleanup part. Let me know if I have to clarify something! 🙏
The global approach you are describing makes sense to me.
> • Once user disconnects, their sink stream is marked as drained, so periodically I look at all the source stream and, using s/downstream to get all connected stream, if all downstreams are drained I close the source stream and remove it from the atom
Your approach works, but you could also subscribe to on-drained
[1] instead of periodically checking the state of your stream.
I would anyway keep your mechanism in place though, but with a higher interval (every hour, two hours?).
> are there better ways to broadcast messages to users in a room?
For what it worth, I would have done it exactly this way.
> is there a better way to handle user disconnecting and cleanup an empty room? Is the current approach leaking resources?
You should be safe with your approach.
Talking about Netty internals, as soon as a channel becomes inactive, the stream is closed.
So you should be fine : https://github.com/clj-commons/aleph/blob/master/src/aleph/http/server.clj#L362
However, maybe you would like to protect your system somehow and do not keep channel actives forever?
> • I was thinking about using an event-bus with a topic for each room and user subscriptions to that topic? How does it scale? Does it automatically cleanup disconnected users?
Here I cannot say, I never used the event-bus
part of manifold. We have no issue open regarding it, but I don't think it's either battle-tested or heavily in use.
This is not a lot of code, so if you want to take a look : https://github.com/clj-commons/manifold/blob/master/src/manifold/bus.clj
[1] : https://aleph.io/codox/manifold/manifold.stream.html#var-on-drained
Thank a lot! This is my first time using aleph and I was afraid that my approach was not ideal. BTW the api offered by both aleph and manifold are great! I will look at the internals of event-bus even if I think I will stick to the current approach. Thank again for your help!
Yes, Zach Tellman did an amazing work in terms of API and on the provided abstractions. The documentation is a bit lacking and some internals and design choices unexplained. But both libraries are doing their jobs pretty well.
Hi Francesco.
First, some terminology, because the Manifold docs are a little vague on this. Draining applies to sources, and closing applies to sinks. The default stream is both a source and sink. So, when you say the user's sink stream is drained, do you mean the user has read all the messages they'll get? I ask, because it may be preferable to close the user's stream when they're done with it, instead of checking to see whether the stream is drained, for the reasons below.
I ask because it sounds you're duplicating a bit of Manifold's default behavior. If you closed a downstream, Manifold's default behavior is to close the upstream when there are no more open downstreams, unless the upstream was created with permanent?
set true (see stream*
).
The catch is, Manifold can delay clean up until necessary. E.g., see the code buried in manifold.stream.graph/handle-async-put
; Manifold won't close the upstream until it runs out of open sinks to put
to. So, if you stop putting messages on the upstream after you close the final downstream, the upstream could hang around for a while.
Your use case is a pub/sub architecture, and is exactly what event-bus
was built for. It automatically handles cleanup for you if you close the user's subscription stream. I suggest giving it a try.
Hi @U10EC98F5!
Thanks for the additional feedback.
Regarding the terminology, yes, I am a bit confused 😅
My assumptions are (were?):
• the room stream is both a sink (it receives events to be broadcasted) and a source (send events to user streams), but the main "role" here is to broadcast so I have called it the source
• user streams, s/connect
ed to room streams are again sinks (receive events from the room) and sources (send SSE data to the browser), and for simplicity I've called them sources
Probably I have oversimplified and I need to adjust my understanding of the terminology, thanks for the feedback! 🙏
> So, when you say the user's sink stream is drained, do you mean the user has read all the messages they'll get?
This is what I have observed:
• I have created a room and connected a user to it, using s/downstream
on the room's stream I get the user's stream which, inspected, shows these properties
Class: manifold.stream.default.Stream
Value: "<< stream: {:pending-puts 0, :drained? false, :buffer-size 0, :permanent? false, :type \"manifold\", :sink? true, :closed? false, :pending-takes 1...
--- Fields:
"__closedCallbacks" = ( )
"__drainedCallbacks" = ( )
"__isClosed" = false
"__isDrained" = false
• closing the web page, the user's stream is still connected to the room stream and looks like this
Class: manifold.stream.default.Stream
Value: "<< stream: {:pending-puts 0, :drained? true, :buffer-size 0, :permanent? false, :type \"manifold\", :sink? true, :closed? true, :pending-takes 0, ...
--- Fields:
"__closedCallbacks" = ( )
"__drainedCallbacks" = ( )
"__isClosed" = true
"__isDrained" = true
Before I didn't notice that it was both drained and closed, and based on your comment I should use s/closed?
instead of s/drained?
to understand if all users have closed their page/connection, is that correct? (the room stream is still not closed or drained at this point so I should look at all downstream streams)
I understand that the room's stream should be closed automatically at some point when there are no active downstream but the point is that events are generated after user action so if there are no users generating events then there will not be new put
s which would mark the upstream as closed, so it looks like that the manual cleaning process is still needed.
> Your use case is a pub/sub architecture, and is exactly what event-bus
was built for
I've learned about event-bus
only recently and I want to give it a try but I have a couple of concerns:
• how does it scale? should I have a bus for each room or one for the whole system with a topic per room?
• how does cleaning works with the event-bus
? should I expect it to automatically remove subscriptions on user disconnections? Should I manage event-bus
instances like I do for the stream approach?
I really need to read its code and get familiar with it before taking a decision 🙂
Thanks again for your help!After reading event-bus
sources I have decided to try it out and it is working properly; also many concerns have disappeared.
If you are curious https://github.com/fpischedda/unrefined/pull/37 are the changes to replace manual streams management with event-bus
Thank you Francesco for sharing!
We should definitely have some documentation regarding event-bus
on Aleph
I would be more than happy to try to write something about it but given my tiny experience with it I am not sure it would be a good idea 😅 Maybe I can write about my journey from stream to event-bus
@U0165ADKUDU Sorry for the delay; I've been traveling. Glad to hear it worked out, though! And PRs to clarify and improve docs are always welcome.
Hi Francesco.
First, some terminology, because the Manifold docs are a little vague on this. Draining applies to sources, and closing applies to sinks. The default stream is both a source and sink. So, when you say the user's sink stream is drained, do you mean the user has read all the messages they'll get? I ask, because it may be preferable to close the user's stream when they're done with it, instead of checking to see whether the stream is drained, for the reasons below.
I ask because it sounds you're duplicating a bit of Manifold's default behavior. If you closed a downstream, Manifold's default behavior is to close the upstream when there are no more open downstreams, unless the upstream was created with permanent?
set true (see stream*
).
The catch is, Manifold can delay clean up until necessary. E.g., see the code buried in manifold.stream.graph/handle-async-put
; Manifold won't close the upstream until it runs out of open sinks to put
to. So, if you stop putting messages on the upstream after you close the final downstream, the upstream could hang around for a while.
Your use case is a pub/sub architecture, and is exactly what event-bus
was built for. It automatically handles cleanup for you if you close the user's subscription stream. I suggest giving it a try.