Fork me on GitHub
#aleph
<
2023-01-06
>
Francesco Pischedda09:01:16

Hi everyone 👋 I am using aleph to handle SSE on the server side and I am not sure that the approach I have used is ideal, so I am looking for help. The code is structured like this: • For each "room" I create a source stream and put it in atom hashmap (key room id, value the stream) • For each user connecting to a room I get the source stream, create a sink stream and connect it to the source ◦ and return the sink stream in the response of the SSE endpoint so that I can send events to users • When I need to broadcast messages I send them to the source stream which then forwards to the users • Once user disconnects, their sink stream is marked as drained, so periodically I look at all the source stream and, using s/downstream to get all connected stream, if all downstreams are drained I close the source stream and remove it from the atom Questions: • are there better ways to broadcast messages to users in a room? • is there a better way to handle user disconnecting and cleanup an empty room? Is the current approach leaking resources? • I was thinking about using an event-bus with a topic for each room and user subscriptions to that topic? How does it scale? Does it automatically cleanup disconnected users? Sorry for the long post, and thanks in advance for your help! 🙏

Arnaud Geiser09:01:39

Hello Francesco! Do you have some code to share based on the approach you describe? Otherwise, I will give you an answer based on your message. 🙂

Francesco Pischedda09:01:32

Hi Arnaud! Thanks for jumping into my question so quickly! 🙂 Here is the code of the whole application https://github.com/fpischedda/unrefined The state atom is handled https://github.com/fpischedda/unrefined/blob/main/src/clj/fpsd/unrefined/state.clj#L9. User connection is handled https://github.com/fpischedda/unrefined/blob/main/src/clj/fpsd/refinements/handlers.clj#L13 on the ring side and https://github.com/fpischedda/unrefined/blob/main/src/clj/fpsd/refinements/events.clj#L13 I do the connection between source and user sink. And finally https://github.com/fpischedda/unrefined/blob/main/src/clj/fpsd/unrefined/mr_clean.clj is the cleanup part. Let me know if I have to clarify something! 🙏

Arnaud Geiser10:01:07

The global approach you are describing makes sense to me. > • Once user disconnects, their sink stream is marked as drained, so periodically I look at all the source stream and, using s/downstream to get all connected stream, if all downstreams are drained I close the source stream and remove it from the atom Your approach works, but you could also subscribe to on-drained [1] instead of periodically checking the state of your stream. I would anyway keep your mechanism in place though, but with a higher interval (every hour, two hours?). > are there better ways to broadcast messages to users in a room? For what it worth, I would have done it exactly this way. > is there a better way to handle user disconnecting and cleanup an empty room? Is the current approach leaking resources? You should be safe with your approach. Talking about Netty internals, as soon as a channel becomes inactive, the stream is closed. So you should be fine : https://github.com/clj-commons/aleph/blob/master/src/aleph/http/server.clj#L362 However, maybe you would like to protect your system somehow and do not keep channel actives forever? > • I was thinking about using an event-bus with a topic for each room and user subscriptions to that topic? How does it scale? Does it automatically cleanup disconnected users? Here I cannot say, I never used the event-bus part of manifold. We have no issue open regarding it, but I don't think it's either battle-tested or heavily in use. This is not a lot of code, so if you want to take a look : https://github.com/clj-commons/manifold/blob/master/src/manifold/bus.clj [1] : https://aleph.io/codox/manifold/manifold.stream.html#var-on-drained

Francesco Pischedda10:01:21

Thank a lot! This is my first time using aleph and I was afraid that my approach was not ideal. BTW the api offered by both aleph and manifold are great! I will look at the internals of event-bus even if I think I will stick to the current approach. Thank again for your help!

Arnaud Geiser10:01:33

Yes, Zach Tellman did an amazing work in terms of API and on the provided abstractions. The documentation is a bit lacking and some internals and design choices unexplained. But both libraries are doing their jobs pretty well.

Matthew Davidson (kingmob)06:01:11

Hi Francesco. First, some terminology, because the Manifold docs are a little vague on this. Draining applies to sources, and closing applies to sinks. The default stream is both a source and sink. So, when you say the user's sink stream is drained, do you mean the user has read all the messages they'll get? I ask, because it may be preferable to close the user's stream when they're done with it, instead of checking to see whether the stream is drained, for the reasons below. I ask because it sounds you're duplicating a bit of Manifold's default behavior. If you closed a downstream, Manifold's default behavior is to close the upstream when there are no more open downstreams, unless the upstream was created with permanent? set true (see stream*). The catch is, Manifold can delay clean up until necessary. E.g., see the code buried in manifold.stream.graph/handle-async-put; Manifold won't close the upstream until it runs out of open sinks to put to. So, if you stop putting messages on the upstream after you close the final downstream, the upstream could hang around for a while. Your use case is a pub/sub architecture, and is exactly what event-bus was built for. It automatically handles cleanup for you if you close the user's subscription stream. I suggest giving it a try.

Francesco Pischedda09:01:26

Hi @U10EC98F5! Thanks for the additional feedback. Regarding the terminology, yes, I am a bit confused 😅 My assumptions are (were?): • the room stream is both a sink (it receives events to be broadcasted) and a source (send events to user streams), but the main "role" here is to broadcast so I have called it the source • user streams, s/connected to room streams are again sinks (receive events from the room) and sources (send SSE data to the browser), and for simplicity I've called them sources Probably I have oversimplified and I need to adjust my understanding of the terminology, thanks for the feedback! 🙏 > So, when you say the user's sink stream is drained, do you mean the user has read all the messages they'll get? This is what I have observed: • I have created a room and connected a user to it, using s/downstream on the room's stream I get the user's stream which, inspected, shows these properties

Class: manifold.stream.default.Stream
Value: "<< stream: {:pending-puts 0, :drained? false, :buffer-size 0, :permanent? false, :type \"manifold\", :sink? true, :closed? false, :pending-takes 1...

--- Fields:
  "__closedCallbacks" = (  )
  "__drainedCallbacks" = (  )
  "__isClosed" = false
  "__isDrained" = false
• closing the web page, the user's stream is still connected to the room stream and looks like this
Class: manifold.stream.default.Stream
Value: "<< stream: {:pending-puts 0, :drained? true, :buffer-size 0, :permanent? false, :type \"manifold\", :sink? true, :closed? true, :pending-takes 0, ...

--- Fields:
  "__closedCallbacks" = (  )
  "__drainedCallbacks" = (  )
  "__isClosed" = true
  "__isDrained" = true
Before I didn't notice that it was both drained and closed, and based on your comment I should use s/closed? instead of s/drained?to understand if all users have closed their page/connection, is that correct? (the room stream is still not closed or drained at this point so I should look at all downstream streams) I understand that the room's stream should be closed automatically at some point when there are no active downstream but the point is that events are generated after user action so if there are no users generating events then there will not be new puts which would mark the upstream as closed, so it looks like that the manual cleaning process is still needed. > Your use case is a pub/sub architecture, and is exactly what event-bus was built for I've learned about event-bus only recently and I want to give it a try but I have a couple of concerns: • how does it scale? should I have a bus for each room or one for the whole system with a topic per room? • how does cleaning works with the event-bus? should I expect it to automatically remove subscriptions on user disconnections? Should I manage event-bus instances like I do for the stream approach? I really need to read its code and get familiar with it before taking a decision 🙂 Thanks again for your help!

Francesco Pischedda13:01:04

After reading event-bus sources I have decided to try it out and it is working properly; also many concerns have disappeared. If you are curious https://github.com/fpischedda/unrefined/pull/37 are the changes to replace manual streams management with event-bus

Arnaud Geiser13:01:37

Thank you Francesco for sharing!

Arnaud Geiser13:01:05

We should definitely have some documentation regarding event-bus on Aleph

Francesco Pischedda13:01:05

I would be more than happy to try to write something about it but given my tiny experience with it I am not sure it would be a good idea 😅 Maybe I can write about my journey from stream to event-bus

👍 3
Matthew Davidson (kingmob)07:01:12

@U0165ADKUDU Sorry for the delay; I've been traveling. Glad to hear it worked out, though! And PRs to clarify and improve docs are always welcome.

🙏 2
Matthew Davidson (kingmob)06:01:11
replied to a thread:Hi everyone :wave: I am using aleph to handle SSE on the server side and I am not sure that the approach I have used is ideal, so I am looking for help. The code is structured like this: • For each "room" I create a source stream and put it in atom hashmap (key room id, value the stream) • For each user connecting to a room I get the source stream, create a sink stream and connect it to the source ◦ and return the sink stream in the response of the SSE endpoint so that I can send events to users • When I need to broadcast messages I send them to the source stream which then forwards to the users • Once user disconnects, their sink stream is marked as drained, so periodically I look at all the source stream and, using s/downstream to get all connected stream, if all downstreams are drained I close the source stream and remove it from the atom Questions: • are there better ways to broadcast messages to users in a room? • is there a better way to handle user disconnecting and cleanup an empty room? Is the current approach leaking resources? • I was thinking about using an event-bus with a topic for each room and user subscriptions to that topic? How does it scale? Does it automatically cleanup disconnected users? Sorry for the long post, and thanks in advance for your help! :pray:

Hi Francesco. First, some terminology, because the Manifold docs are a little vague on this. Draining applies to sources, and closing applies to sinks. The default stream is both a source and sink. So, when you say the user's sink stream is drained, do you mean the user has read all the messages they'll get? I ask, because it may be preferable to close the user's stream when they're done with it, instead of checking to see whether the stream is drained, for the reasons below. I ask because it sounds you're duplicating a bit of Manifold's default behavior. If you closed a downstream, Manifold's default behavior is to close the upstream when there are no more open downstreams, unless the upstream was created with permanent? set true (see stream*). The catch is, Manifold can delay clean up until necessary. E.g., see the code buried in manifold.stream.graph/handle-async-put; Manifold won't close the upstream until it runs out of open sinks to put to. So, if you stop putting messages on the upstream after you close the final downstream, the upstream could hang around for a while. Your use case is a pub/sub architecture, and is exactly what event-bus was built for. It automatically handles cleanup for you if you close the user's subscription stream. I suggest giving it a try.