aleph

2026-02-07T11:49:30.405659Z

Hi, I have a question about how to handle Server Sent Events with Aleph. Consider this snippet:

(defn sse-handler [req]
  (let [stream (s/stream)]

    (do
      (s/put! stream "data: some data!\n")
      (Thread/sleep 5000)
      (s/put! stream "data: some other data!\n"))

    {:status 200
     :headers {"Cache-Control" "no-cache",
               "Content-Type" "text/event-stream",
               "Connection" "keep-alive"}
     :body stream}))
In that case I block the handler in the do block, 2 events will be added to the stream. When the do block is done the response is returned and Aleph will send the response then consume the stream. What I am looking for, if it is possible, is a way to have: 1. Aleph sends the status and headers immediately 2. starts the do block containing the code sending SSE events Is that possible without turning the do into a future/vthread? In other word I am wondering if I can flush the status and headers immediately then have some sort of on-open callback mechanism similar to Http-kit's that will run the event sending code when the connection is open (status and headers flushed). For context I am making an Aleph adapter for the Datastar clojure SDK. Cheers,

2026-02-09T09:44:09.762499Z

In http-kit you got what is called a channel, it represents the connection to the client. You have :on-open callback on it and you also can send events using it. Note that depending on how you use it, it breaks the request-response model of ring. You can start responding in your main ring handler before all middleware are done running. There is a way to mitigate that however. Pedestal uses an interceptor that runs last on the response. That interceptor initiates the connection, flushes status and header and only then runs the on-open channel callback. Inspired by this I provide something similar in the Datastar Http-kit SDK. With this you get a normal ring execution model that with the added on-open callback that is run just after all middleware/interceptors are done.

2026-02-09T12:24:54.390899Z

ok, yeah I guess you could compare a channel to a stream? In the end httpkit also calls this channel on a different thread, right? Otherwise I don't understand how it works. I don't know of something like that in Aleph. Is there something specific you want to achieve? I do SSE with aleph with something like this:

(require '[manifold.stream :as s])


(defn sse-output
  [s]
  (format "data: %s\n\n" s))

(defn ring-handler [request]
  (let [sse-stream (s/periodically 5000 (fn [] (sse-output "hello")))]
    {:status 200
     :headers {"content-type" "text/event-stream"}
     :body sse-stream}))

2026-02-09T12:39:41.265959Z

For a mental model you couldsay that the channel is a fancy java OutputStream not a stream in the manifold sense. What you write to it is sent to the client. You don't need to let the adapter execute it's handler till the end and defer sending events by using another thread. You can just write to it when you get and it, like you would with an OutpuStream or a plain socket. Also the channel comes in the ring request map.

dergutemoritz 2026-02-09T12:52:09.796669Z

There is no API for sending a response other than returning from the handler, so I'd say your assessment is correct!

dergutemoritz 2026-02-09T12:52:33.640599Z

Can you show us how you achieved this with http-kit using your initial example?

2026-02-09T13:19:40.954729Z

Ok this is going to be a bit of code, I extracted and simplified it from my SDK:

;; I have a as-channel replacement's for Http-kit's
(defn- as-channel
  "
  Replacement for [[hk-server/as-channel]] that doesn't deal with websockets
  and doen't call `on-open` itself.

  `on-open` is meant to be called by either a middleware or an interceptor on the return.
  "
  [ring-req {:keys [on-close on-open init]}]

  (when-let [ch (:async-channel ring-req)]

    (when-let [f init]     (f ch))
    (when-let [f on-close] (org.httpkit.server/on-close ch (partial f ch)))

    {:body ch ::on-open on-open}))


;; This is the example ring handler using SSE rewritten for http-kit
(defn sse-handler [req]
  (-> (as-channel req ;; The custom as-channel is a modified version of http-kit's.
                  ;; The do from my example become a callback
                  {:on-open (fn [ch]
                             (hk-server/send! ch "data: some data!\n" false) ;; http-kit send! instead of manifold put!
                             (Thread/sleep 5000)
                             (hk-server/send! ch "data: some other data!\n" false))})
      ;; As channel return a map with the body of the response, I add status and headers
      (assoc
        :status 200
        :headers {"Cache-Control" "no-cache",
                  "Content-Type" "text/event-stream",
                  "Connection" "keep-alive"})))

;; There we have the code that will take control of flushing headers and starting our callback
;; on "connection open". It is a separate function from the middleware because I can re-use it
;; in an interceptor.
(defn start-responding!
  [response]
  (if (::datastar-sse-response response) ;; Here I check if I have a SSE response
    (let [{on-open ::on-open
           ch      :body} response ;; I get the channel and the callback from the response map
          response (dissoc response :body ::on-open ::datastar-sse-response)]

      ;; Here I send the status and headers this way the base SSE response is sent immediately
      (hk-server/send! ch response false)

      ;; There I call my on open callback. At this point the client know it is getting a sse stream,
      ;; so the callback can block and take time to execute. Long execution in the callback won't
      ;; timeout the client since the status and headers have been sent already.
      (on-open ch))
    response))

;; This is the middleware you want as the first of the chain. This way the managing of the response happens last.
(defn wrap-start-responding
  [handler]
  (fn [req]
    (let [response (handler req)]
         (start-responding! response)
         response)))

👀 1
2026-02-09T13:23:05.349709Z

There is a bit of http-kit machinery that may be noisy. The main thing is that the ring response contains the Http-kit AsyncChannel and the callback that is the do block in my initial example. The middleware takes those and manages sending data to the client by itself. In that model instead of letting http-kit get the ring response and to its thing, it lets you take control of the output stream and you manage it yourself.

2026-02-09T13:23:24.977199Z

I copied the middleware trick from pedestal 😁

dergutemoritz 2026-02-09T13:25:09.006419Z

Thanks! Right, there is no equivalent of org.httpkit.server/send! in Aleph's HTTP server

2026-02-09T13:25:43.201449Z

Ok then spawning another thread to let the handler finish is the way then. Thanks!

dergutemoritz 2026-02-09T13:25:56.129319Z

yep but you don't need to spawn a thread explicitly FWIW

dergutemoritz 2026-02-09T13:26:03.587399Z

you can leave that to manifold behind the scenes

dergutemoritz 2026-02-09T13:26:14.899389Z

as @jeroenvandijk did e.g. with s/periodically

2026-02-09T13:27:08.359629Z

Noted, I might take a look at how periodically is implemented then. It might give me ideas. Thanks!

dergutemoritz 2026-02-09T13:28:14.005889Z

I am not familiar with datastar but I would expect that it uses the SSE channel to send page updates triggered by some state changes?

dergutemoritz 2026-02-09T13:28:47.714469Z

so you would probably subscribe to these changes and send an SSE whenever you get notified?

2026-02-09T13:31:04.445629Z

You use SSE to send html fragments to the clients, Datastar will morph the fragment into the DOM. So if you keep the channel around in http-kit, or the stream in Aleph, when changes occur you can indeed use it to send updates to the page.

dergutemoritz 2026-02-09T13:31:21.979639Z

Right

2026-02-09T13:31:32.439749Z

It's like sending a new frame to your screen when one is ready.

2026-02-09T13:31:41.295349Z

somewhat 😁

dergutemoritz 2026-02-09T13:32:05.859629Z

So I guess whether that happens on the original thread which handled the initial request / response exchange or not doesn't really matter

dergutemoritz 2026-02-09T13:32:19.076959Z

in the grand scheme of things 😄

2026-02-09T13:33:23.337879Z

No it doesn't really matter since we now have vthreads. Still this all thread is me being a bit finicky and wondering if I can do the least work possible

2026-02-09T13:33:52.516509Z

Plus it gives me an excuse to learn more about Aleph.

dergutemoritz 2026-02-09T13:36:16.308039Z

Right on!

dergutemoritz 2026-02-09T13:36:30.977769Z

In principle, it should be possible to have a similar API to http-kit in Aleph, too

dergutemoritz 2026-02-09T13:36:43.316459Z

But it would feel a bit unidiomatic and not compose well with how it works otherwise

dergutemoritz 2026-02-09T13:38:34.249789Z

https://cljdoc.org/d/manifold/manifold/0.5.0/doc/execution should be a good starting point to understand the general execution model

dergutemoritz 2026-02-09T13:39:14.121189Z

@jeremys Out of curiosity: what prompted you into writing an Aleph datastar adapter?

2026-02-09T13:40:25.491789Z

Yeah I understand. The Aleph model makes the handler very functional. Going with something like Http-kit channels breaks that and you go into side effect land. I actually had to use the middleware stuff because a naive use of Http-kit as-channel breaks middleware that modify the response...

👍 1
2026-02-09T13:48:23.143519Z

Right now, in the official SDK for Datastar we support Ring Jetty and Http-kit. Both come with trade offs. Since the SDK is designed to isolate what is generic SSE/Datastar stuff from the actual ring adapter used I wanted to revisit how hard it is implement the SDK for another Ring adapter. Also I don't know Aleph well but it seems to be mature and manifold streams may be a good option to build the types of backend we tend to build when using Datastar. The go-lang people love to embed NATs in their backends. Manifold, if not NATs still offers interesting primitives to manage event streams.

dergutemoritz 2026-02-09T13:51:25.092149Z

Very nice, keep us posted on how it goes 🙂 Also happy to fix bugs or look into extending APIs when reasonable 👍

dergutemoritz 2026-02-09T13:51:30.237729Z

What's NAT in this context?

dergutemoritz 2026-02-09T13:51:44.075339Z

(assuming it's not Network Address Translation :D)

2026-02-09T13:52:18.422069Z

https://nats.io/about/

2026-02-09T13:54:53.312809Z

Event bus, streams, with persistence. Since it is written in go you can use it as a library in a go program. The Datastar goffers love to use it to implement event sourcing and CQRS in their backends.

dergutemoritz 2026-02-09T13:55:12.443749Z

I see

dergutemoritz 2026-02-09T13:55:26.782079Z

Sounds a bit like ZeroMQ except persistent

2026-02-09T13:58:12.031539Z

I am not a all an expert there but yes. You have event/message streams you can publish and subscribe to. The streams can be persisted. It offers the possibility to distribute the stream on several machines...

dergutemoritz 2026-02-09T14:01:05.237359Z

Right, this is different in scope than Aleph / Manifold

dergutemoritz 2026-02-09T14:01:29.880889Z

Manifold is more like core.async / go blocks but different

dergutemoritz 2026-02-09T14:01:52.941929Z

and Aleph is a Netty wrapper using Manifold, i.e. only supports various network protocols

dergutemoritz 2026-02-09T14:02:11.609339Z

But no pub/sub protocol, really

2026-02-09T14:02:27.407629Z

You got an event bus thing in manifold no?

dergutemoritz 2026-02-09T14:02:37.613819Z

ah well Netty has an mqtt implementation I guess that counts but Aleph doesn't expose it 😄

dergutemoritz 2026-02-09T14:03:14.838819Z

Indeed, there's an event bus thing in there but it's not very prominent

dergutemoritz 2026-02-09T14:03:33.859519Z

and definitely doesn't have a persistence story 🙂

2026-02-09T14:04:37.290979Z

Anyway all I was trying to say is that manifold may offer some nice options to manage a streams of events that are interesting when doing backends in Datastar.

2026-02-09T14:05:09.144669Z

So Aleph may be a good addition to the Ring adapters we support.

dergutemoritz 2026-02-09T14:07:22.163899Z

Yep, for that purpose it should definitely be a good fit 👍

2026-02-08T19:08:28.584019Z

Don't think you can do it without involving another thread somehow otherwise it would be synchronous? So the status and headers can't be flushed

2026-02-08T20:31:15.238329Z

I have not dived into Aleph source but that's the feeling I have. With Ring Jetty and Http-kit I managed the model I describe, that is flushing status and header then starting sending events in the same thread that handles the response, i.e., no new thread needed. If I can't do it that way with Aleph I'll mention the need to create a thread to free the one handling the response. I needed to make sure though, so I came asked 😀

2026-02-09T03:06:02.682009Z

Hmm yeah I guess it makes sense that it could happen within one thread. If it is possible in aleph I don't know how. In my conceptual model of aleph it wraps the ring protocol so the response map has to be complete before it does something. If the body is a stream, this part is streamed to the client. But this setup means the stream has to be filled by another thread. Does httpkit do something outside of the ring protocol? How do you set the status and the headers in the setup you mention?