hi all, thanks for all of your work on Pedestal. I was just browsing the docs site and saw that when I do a search, the documentation for older versions of Pedestal precedes the docs for newer versions. I wonder if it might be a positive change to arrange the results so that newer documentation appears first. I'm very grateful for the guides, btw!
Good idea, I’ll need to do some Antora logic to figure out how to make that happen.
I adapted cljdoc to the breaking change in pedestal.test response headers in 0.8.1, but was a bit surprised that I also had to change "Accept" to "accept" in my request :headers for my tests. If this does not ring any bells, I could try to create a minimal repro.
I think I did change that; Ring :headers keys are supposed to be lower case. I did change the code in io.pedestal.connector.test to pass request headers through unchanged; maybe that was a mistake and they should be lower-cased. That may be a mistake, your code should not have to care, so I may need to revert that small part.
Ok, cool, no need for issue, then?
No, but I may need to cut an 0.8.2.
Yeah, if you find my report above is valid, then I expect so.
I made a small Pedestal server based on the https://pedestal.io/pedestal/0.8/reference/server-sent-events.html#_making_an_sse_interceptor but for some reason the SSE responses don’t seem to be streaming.
My code is in https://gist.github.com/yurivish/9c84ec6576e83a4ff561b4bd11857903 and emits 4 events spaced 250ms apart, but for some reason the "streamed" events all show up at the end, rather than incrementally.
Do I need to do anything special to enable streaming or disable buffering or explicitly flush?
I'm testing with curl --no-buffer 'http://localhost:8080/events`'` and in the browser.
In both cases the response stays empty for a full second, and then all four events show up, even though the code emits them 250ms apart (confirmed via println).
Edit: Figured it out. Let me know if you think the docs would benefit from being adjusted to use a future or if there's a more idiomatic way to do the channel sends asynchronously and I can make a PR to the docs.
@jeremys out of curiosity, what libraries do you prefer for building a web server – do you choose pedestal in production or was it just something you were exploring with datastar?
If I set the buffer size size to less than 4, the response hangs. If I set it to 4, the four events are all printed out at once after one second (https://pedestal.io/api/0.8/io.pedestal.http.sse.html).
(sse/start-stream on-stream-ready context 1 4)I don't see anything printed in the debug logs while the events are emitted, and am not sure I see any relevant HttpConfiguration settings for jetty that might be responsible, though that's a bit beyond my current level of knowledge
Is it possible this is a bug? It seems too simple to be a bug, since it’s also the docs example.
Aha – it's just that the "ready" handler runs on the same thread. So this works:
(defn on-stream-ready
[event-chan _context]
(future
(dotimes [i 4]
(println "count" i)
(>!! event-chan {:name "countdown" :data i})
(Thread/sleep 250))
(close! event-chan)))I also noticed that the Connection: close header is sent when streaming back SSE responses at least for HTTP 1.1 requests. Reading https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Headers/Connection on the Connection header it seems like it might be undesired, since close "Indicates that either the client or the server would like to close the connection". Out of curiosity, was this an intentional choice?
Are you using jetty or http-kit?
Ah, yes, jetty.
Yep. I might have gotten stuck on this because I'm new to Clojure (last used it seriously ~10y ago!) 😄 so the async behavior in the example threw me off – maybe it's more intuitive to someone more practiced.
I think the main thing is not to stay inside the callbacks so long, spin up a thread or a CSP instead.
Pedestal seems great though. Continuing to poke around, made a fork of the sse support locally so I could add https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#retry support and play around with making my own little server-side https://github.com/starfederation/datastar/blob/develop/sdk/ADR.md support
If you are open to using http-kit instead of jetty I made a pedestal adapter for the Datastar clojure SDK https://github.com/JeremS/datastar-pedestal-http-kit/tree/main. Cheers!
I saw! Thank you for your great work. I’m doing this as a learning exercise as much as anything — gives me more of a feel for the space. :) But I might also not have realized that I could use the standard d* adapter for http-kit even with Pedestal since I didn’t understand how that would interact with Pedestal’s sse support (I guess you’d just not use that and use the d* functions instead?)
The standard adapter for http-kit will not work out of the box with pedestal. The one I linked will. Have fun with your explorations!
oh, d'oh, didn't realize that was a different project of yours. It's evening here but I will definitely have a look later as I am curious how you did it, thanks.
I don't quite know what I'm doing yet, but I think what I started working towards today is more of a Pedestal-specific set of d* functions that are agnostic to the underlying server implementation, with functions like this:
(defn dispatch
"Send an SSE event in datastar format to the channel ch.
Accepts event :name, :id, :retry, :data.
See "
[ch opts] (>!! ch opts))
(defn- patch-elements-core
[elements & {:keys [selector mode use-view-transition] :as opts}]
(let [data (cond-> nil
selector (conj (str "selector " selector))
use-view-transition (conj (str "useViewTransition " use-view-transition))
mode (conj (str "mode " mode))
elements (concat (elements-list elements)))]
(assoc opts
:name "datastar-patch-elements"
:data (string/join "\n" data))))
(defn patch-elements
"See "
([ch elements] (dispatch ch (patch-elements-core elements)))
([ch elements opts] (dispatch ch (patch-elements-core elements opts)))) If you're focused on pedestal, this makes sense. It lets you use the core.async channel as your sse-connection and it will work with every server pedestal supports. The only thing that is not ideal in this model is detecting when a client disconnected. Unless I am wrong, you need to test whether the channel is closed before sending the event and thus you'll find if the connection is still open only then. The other solution is to use a heartbeat. My thing uses the underlying AsyncChannel from Http-kit. It allows for a on-close callback that will run as soon as Http-kit detects the disconnections, but is Http-kit only.
oh, thanks, that's interesting. I just had a look at that and the SSE implementation uses the heartbeat to detect when it's disconnected, and calls the optional :on-client-disconnect callback. It looks like it will also detect when sending an event failed because the client closed the connection: https://gist.github.com/yurivish/bcf9ab703b71230baa6a34040b80288f#file-sse-clj-L128
I've been focusing on adding a first cut at gzip compression. It's here if you're curious: https://gist.github.com/yurivish/bcf9ab703b71230baa6a34040b80288f
(ie. i think you can just send events without checking, and if the connection has closed in the meantime it will be detected and your "on close" callback will be invoked)
though you do get an ugly exception in the log
[async-io-57] WARN io.pedestal.http.impl.servlet-interceptor - {:msg "The pipe closed while async writing to the client; Client most likely disconnected.", :line 144, :src-chan #object[clojure.core.async.impl.channels.ManyToManyChannel 0x59f0163f "clojure.core.async.impl.channels.ManyToManyChannel@59f0163f"]}
org.eclipse.jetty.io.EofException: Broken pipe
at org.eclipse.jetty.io.SocketChannelEndPoint.flush(SocketChannelEndPoint.java:118)
at org.eclipse.jetty.io.WriteFlusher.flush(WriteFlusher.java:419)
at org.eclipse.jetty.io.WriteFlusher.write(WriteFlusher.java:272)
at org.eclipse.jetty.io.WriteFlusher.write(WriteFlusher.java:251)
at org.eclipse.jetty.io.AbstractEndPoint.write(AbstractEndPoint.java:368)
at org.eclipse.jetty.server.internal.HttpConnection$SendCallback.process(HttpConnection.java:863)
at org.eclipse.jetty.util.IteratingCallback.processing(IteratingCallback.java:253)
at org.eclipse.jetty.util.IteratingCallback.iterate(IteratingCallback.java:232)
at org.eclipse.jetty.server.internal.HttpConnection$HttpStreamOverHTTP1.send(HttpConnection.java:1437)
at org.eclipse.jetty.server.HttpStream$Wrapper.send(HttpStream.java:185)
followed by
[async-io-57] INFO d*.sse - {:msg "Response channel was closed when sending event. Shutting down SSE stream.", :line 128}At a cursory glance the compression looks fine though the biggest gains are when you start using algorithms like brotli. For a long lived stream of events brotli can get you crazy compression ratios. The disconnect problem with Jetty is a bit more annoying than this, you need to send events until it throws and it is possible that it will take more than one event sent for it to actually throw. From my informal tests if you send a tiny payload it takes 2 events to actually throw. This means that you might think the client received an updated when it actually hasn't.
It's also why I focus on http-kit for now.
huh, it looks like you're right re: multiple events. I had naïvely thought that jetty, being older, would have less odd edge cases, but maybe in this case that is not true
i wonder if this sse implementation on top of http-kit would behave correctly..
I wen't directly to AsyncChannel from Http-kit eschewing the core.async one. That way I get the on-close callback. Make sure to use http-kit v2.9.0-beta2 though, the v2.8.1 has an issue on that front.
looks like with http-kit it still sends two events after the client closes the connection, in my implementation, with the heartbeat set to a very large number ie. disabled and I can get jetty to send three extra events by ctrl-c'ing the curl request right after making it.
😱
> curl -s -H "Accept-Encoding: gnotzip" ''
event: datastar-patch-elements
data: elements <h1>1</h1>
^C⏎
w/ http-kit:
sending 1
sent 1
sending 2
sent 2
sending 3
sent 3
sending 4
sent 4
sending 5
sent 5
sending 6
sent 6
sending 7
sent 7
sending 8
sent 8
sending 9
sent 9
sending 10
sent 10
[async-io-5] INFO d*.sse - {:msg "Event channel has closed. Shutting down SSE stream.", :line 126}
Leaving SSE context.using this interceptor:
(def events-interceptor
(d*/sse-interceptor
::events
(fn [ch _context]
(future
(loop [i 1]
(when (<= i 10)
(println "sending " i)
(when (patch-elements ch [:h1 i])
(println "sent ", i)
(Thread/sleep 1000)
(recur (inc i)))))
(close! ch)))))
edit: investigating if i messed something up since this feels very suboptimal...I think I am just going to stick to my implementation and jetty for now, and not assume that event sent = event received, since that's not guaranteed even if you can detect immediately when the connection is closed.
the bug above may be when the heartbeat is set to 999, which might lead to http-kit never noticing that the client closed the connection? (pedestal 0.8.1, http-kit 2.9.0-beta2)
If I am correct adding the core.async channel make it so you need to lose 1 event before the disconnection is detected. You should lose 1 event (on the second send the channel is closed) with http-kit and 2 event with jetty (on the third send the channel will be closed). At least that what I remember figuring out when I did some thinking on this some months ago...
You are right I made a typo... It's not user facing so I'll need to correct it.
Typically if you use the AsyncChannel the way it's presented in the https://github.com/http-kit/http-kit/wiki/3-Server#using-http-kits-unified-api (using as-channel in the handler) middleware/interceptors that modify the response won't work. The response is sent to the client before middleware or interceptors run the :leave the part.
The response committed channel puzzled me at first but that's the trick use to defer the sending of the response after the whole interceptor stack is done. It's a really neat way of doing it and I have taken inspiration from this pattern in the SDK.
It might make sense to leave the typo since it’s spelled that way in Pedestal, since the consistency makes searching easier: https://github.com/search?q=repo%3Apedestal%2Fpedestal+commited&type=code
i have to say I am having a lot of fun with this. Might try to see if I can prototype something for work with it. current design iteration for an SSE protocol supporting uncompressed/gzip/brotli: https://gist.github.com/yurivish/d032000c23f0443de98202bff736b741
One thing to think about is that you may want to send/flush SSE events as soon as they are written. That is what the SDK does. Doing it that way you don't need write-bytes&`send-bytes` . You'd just have a send function that writes to the outputStream and flushes immediately. That means you could let go of the SSE protocol and the records.
You could have just a send function that represents your SSE connection. That function would close over the AsyncChannel and the ByteArrayOutputStream (when compressing). Called with the SSE event to send it would, well, send it. Called with 0 arguments it would close the connection. Doing it that way would mean no protocol, no records. Just an idea 😁
thank you, that is interesting! let me think about that.
My protocol initially had just a send function, but using that API to flush entire SSE events at a time would require constructing them, eg. joining a vector of SSE lines into a string. So I changed the API surface to decouple writing and flushing so that I can avoid constructing that string, instead calling write-bytes sequentially on the individual lines. Does that seem like a reasonable justification to you?
That's separate from the question of protocol vs. function, which I can still do using a multi-arity function and is an interesting idea. Will ponder trade-offs over lunch!
Edit: I suspect a couple of things I said here don’t quite make sense…
I think I understand what you mean. When not compressing you send line after line skipping the concatenation, when compressing the concatenation happens in the encoder. It's just that my brain tends to want to minimize protocol/records usage... One unrelated thing that now comes to mind is the case where more than one thread wants to send an event. It may never happen depending on the way you architect your backend though. The go implementation I studied uses a lock and I copied that. If you know only one thread can send events no need.
Good point. I should at least add a note to the documentation that this is not safe to be used from more than one thread. In your uses, have you sent to the same SSE connection from multiple threads?
In my uses no I haven't. If you have some realtime reactive thing going that's something to consider.
Yeah. I don't actually have a specific use case at this exact moment. 😄 But a lot of the work I do could benefit from being server-oriented so I'm setting up this base for future operations. From what I've seen of eg. Anders's demos, even in the realtime interactive case you want to send from a single thread so that you can debounce updates.
Maybe under that is the fact that for the "fat morph" approach, all of your state needs to be sent together, and any independent updates would clobber each other. Haven't thought it through though, so maybe this is not a good way to think about it.
that's the thing, in the SDK I have quite a generic setup and so I added the lock which also allows for the user to hold the lock and batch several events preventing interleaving events from several threads. Now is having a backend that does this a good idea? Probably not. Still since it is generic SDK it covers the use case. Anders is even thinking of going farther and ditching the single vthread/go-loop in front of each SSE connection. The idea would be to run a render loop at a fixed rate that re-renders all clients. Basically a game rendering loop. That way you don't pay context switching from vtrhead to vthread when rendering. Same question, do you need to do it this way? 🤷
That's the same pattern you used in the https://github.com/JeremS/datastar-pedestal-http-kit/blob/7760757a715252370e5ae5f3e9b117aa7766933f/src/dev/bench/broadcast.clj#L13 benchmark, right? If I understand right, one could run it with a fixed framerate like a game engine, or use it to update all clients whenever any client triggers a state change, by eg. having a single renderer thread that listens on a channel for "rerender all clients" requests (maybe the coordination around this channel is the context switching you mean).
The examples in the SDK's repo (or in this repo) are more about me figuring out what is possible and if the SDK let's you do it than teaching best practices or benchmarking. This little broadcast example let's me open several curl processes and verify if headers are properly sent, messages are broadcasted and disconnections are detected in both cases. I spent so much time in the datastar discord that sometimes I forget than not every body does. The best explanation for what I mention might be in the latest datastar podcast. Anders is on as a guest and I thinks he explains how his demos work as well as his idea about the rendering loop. This is the https://www.youtube.com/watch?v=DFYuHGKMit8.
I did listen to that one!
If I understand, there is a similarity between your broadcast architecture and Anders's game loop idea, both of which differ from the persistent vthread per conn.
In the case of persistent vthreads, each vthread is responsible for rendering (either on a regular cadence, or at the direction of some central coordinator). In either case, the vthreads needs to wake up, do its work, then park. When a new connection comes in, it runs its own loop for rendering.
In the game loop / broadcast example, the individual vthreads live only long enough to conj their conn onto the global connections atom. Then, a single separate long-lived vthread (present in the game loop but absent from your example) would render to all of these connections in a loop. Eg. imagine if instead of curl, your example ran a long-lived vthread with something like this: (loop [] (Thread/sleep 100) (broadcast (next-render)) (recur))
That's a bit of the idea except that you could split the rendering load to several core (Anders is thinking constraining the server to either read from the db or write to the db not both at the same time). There is a lot to refine and this is not a fits all apps type of architecture. However it's interesting to see how much he can push $5 a month server with his demos.
yeah, good point. indeed, it is pretty amazing that a language as expressive and robust as clojure can run so quickly!
When I started working on the SDK I tried Http-kit, Ring Jetty, Rj9a and pedestal 7.x. Right now the server that handles SSE the best is Http-kit (starting at v2.9.0-betaX). It is the only one of the servers I tested that handles a client disconnecting nicely. Jetty has the problem of needing to send events until an exception is thrown and that problem was amplified if using core.async channels in pedestal 7. Rj9a has an issue where you can't easily keep a connection open even using the ring async API. Aleph might be good though, I just have tested it yet.
So when it comes to using SSE a lot I'd say Http-kit is nice. As for what libraries to use it with, if you like Pedestal I think it's a good choice. I mean if Nubank uses it, it must do something right!
Thanks for doing the research. Yes, agreed 😄
@jeremys If you're curious I went poking around with Claude and came up with a potential improvement to client disconnection detection (2 lines): https://github.com/pedestal/pedestal/pull/974 I don't think this solves the delays you mentioned that have to do with the core.async channel response pattern, but in my local testing it does seem to enable fast detection of client disconnects while remaining composable with the rest of Pedestal.
That solution will close the core.async faster but the user still has to check if it is closed. It's probably the best trade of if you want to use the core.async channel and not directly manage the Http-kit AsyncChannel.
Yeah. I might try writing my own http-kit-specific sse adapter, since Pedestal already supports returning an http-kit AsyncChannel as the body of a response. I would not be surprised if I wind up close to what you wrote, but I am enjoying implementing my own minimal datastar stack. 😄
(hello curse of lisp)
If you go pedestal&http-kit only it will be a bit shorter than what I made.
cool. Plus I think I want to avoid the datastar sdk since I seem to have written a smaller version with just the bare minimum - https://gist.github.com/yurivish/3b8f99255be382af7dbad9b03c088175
Anders, who is well known in the community (for the game of life or billions checkboxes demos) uses his own implementation, not the SDK. It's not that much code to maintain if you implement just what you need and you can get away with something very tiny. The kiiler feature you want though is brotli compression. As soon as you do something cqrs style and keep 1 sse stream refreshing the page you can get crazy compression ratios.
I've been following Anders's work and comments on the d* Discord with great interest and didn't realize he also uses his own implementation, that's cool. I've heard good things about Brotli. My impression is that at least in general, encoding into it is relatively cpu-intensive so I'm curious about what makes Brotli specifically perform so well for highly repetitive SSE streams. Also, checkbox-style demos in particular seem like they would benefit from that, so I wonder how strongly the benefit translates to more standard-looking DOM trees and update patterns. I assume a big part of it is the window size, since my guess is most of the compression comes from backreferences to long strings of repeated text. If that's the case, I wonder if any other compression algorithms could be adjusted to have comparable compression ratios. The Gzip spec specifies a fixed 32kb size but I wonder how well other large-window algorithms would do. Edit: looks like brotli and zstd are possibly the only two large-window compression encodings with wide browser support.
Interesting 6-year-old thread: https://news.ycombinator.com/item?id=19678985 Looks like without benchmarking on real workloads & implementations one can’t make any high-confidence claims in this area. But Brotli has much wider browser support according to caniuse (76% vs 95% with most of the difference coming from older versions of Chrome and iOS Safari) Further reading from Anders: https://www.reddit.com/r/Clojure/s/JPcB1oUnTm
first mini prototype seems to work (thanks to your code for the response-commited-ch hint - that had me confused for a while): https://gist.github.com/yurivish/7115eeec51d4b74f4caa1d58f247b4ee
Also I think it should be spelled “committed”. Probably too late to change now. Funny how this sometimes happens, eg. https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Headers/Referer