Fork me on GitHub
#missionary
<
2023-02-06
>
denik22:02:36

I’m working on wrapping a SSE (server-sent-events) in missionary and ultimately photon. My wrapper has a callback that returns strings of values as they become available. The goal is to have a reactive value that represents the entire string (all previous values concatenated) to be rendered in the ui. How would one do this idiomatically in using missionary/photon?

denik22:02:47

the following works great:

#?(:cljs
   (p/defn OpenAIStream [model-opts]
     (->> (m/observe
            (fn [!]
              (openai-stream "/completions"
                             model-opts
                             (fn [r] ; <= callback
                               (js/console.log r)
                               (! r)))))
          (m/reductions str "")
          new)))

(p/defn TestStreams []
  (p/client
    (dom/div "foo")
    (dom/div (new OpenAIStream {:prompt      "count to 5:"
                                :model       "text-davinci-001"
                                :temperature 1
                                :stream      true
                                :max_tokens  256})))
  )
why does new need to be used twice?

denik23:02:07

the result is very satisfying btw

👀 2
denik00:02:50

this works great for rendering the result directly into the dom. however, I’d actually like to only run this flow when a button is pushed and also swap! its state into an atom. here’s some code I have that does not work:

(dom/div
  {::dom/class         [:ma2 :self-center :pointer :br3 :ba :ph2 :pv1 :b--gray
                        (if btn-disabled? :o-50 :dim)]
   ::dom/aria-disabled btn-disabled?
   ::ui/click-event    (p/fn [e]
                         (swap! !state dissoc :completions)
                         (->> (m/observe
                                (fn [!]
                                  (openai-stream "/completions"
                                                 (assoc @!state :stream true)
                                                 !)))
                              (m/reductions (fn [out {:keys [index text]}]
                                              (println :running out)
                                              (update out index str text))
                                            [])
                              (m/reductions (fn [_ res]
                                              (swap! !state assoc :completions res)
                                              )
                                            nil)))}
  "Complete it!")

xificurC08:02:54

> why does new need to be used twice? a p/defn returns a flow that you have to call (join). The join operator is new. m/reductions also returns a flow, so inside photon you also have to join it. Does that make sense?

👍 2
🙏 2
xificurC08:02:33

m/observe expects the return value of your function to be a 0-arg fn, for unmounting. Does openai-stream return such a function?

👍 2
xificurC08:02:22

for your last example, one reason it wouldn't work is you didn't call new on the missionary flow. I think the code could be simplified somewhat, the previous version with a separate flow returning the streamed strings seemed better factored

xificurC08:02:12

I don't have your setup so cannot test this but this should be in the right direction. Let me know if I can help further

denik18:02:11

tried the latter. sadly it does not work

denik19:02:03

here’s a simple repro

denik19:02:50

state is reset once, not with every new value

xificurC20:02:05

the problem is the click handler unmounts before the flow would finish. If you run the body of it outside the click handler it runs correctly. Does the openai API provide a way to know when the streaming is completed?

xificurC21:02:20

Here's your repro with an addition - the p/fn is now Pending until the done? dfv is completed

denik17:02:56

great. confirming that this works. thanks @U09FL65DK

😉 2
denik17:02:55

and yes openai has a finish payload

xificurC08:02:50

we could get rid of the dfv by removing the reductions and reducing over the flow. Something along the lines of below

denik16:02:29

I tied that but then it would only return the final result

denik16:02:19

but with a callback that works:

#?(:cljs
   (defn test2 [on-value]
     (->> (m/observe
            (fn [!]
              (let [iid (js/setInterval #(! (rand-int 100)) 150)]
                #(js/clearInterval iid))))
          (m/reduce (fn [out v]
                      (if (> v 90)
                        (reduced out)
                        (let [next-out (conj out v)]
                          (on-value next-out)
                          next-out)))
                    [])
          )
     ))

xificurC16:02:59

yes, if you want to stream the intermediate results you'll have to do so out of band. You had to call reset! anyway, right?

Daniel Jomphe18:02:36

Is this stuff amenable to someday have a configuration of photon that doesn't rely on WebSocket? 🙂

xificurC18:02:57

It's possible in the future. What transport do you have in mind?

Daniel Jomphe18:02:09

HTTP with SSE! 🙂 Through AWS API Gateway hmmm Direct Connect - I don't remember if it's still through a gateway or... direct. Anyway, it's transparent to client and server, as long as they speak HTTP obviously.

xificurC19:02:09

Thanks. Why do you prefer http with sse over websocket?

Daniel Jomphe19:02:24

We run from Datomic Cloud Ions, which serves the backend behind AWS API Gateway, and doesn't expose a default working WebSocket setup at all. Given that, a few years ago https://clojurians.slack.com/archives/C03RZMDSH/p1638273464472100. It isn't what you expect. They say it scales WebSockets infinitely, and it's true: no need to standup one more server instance per X socket connections, etc. The price you pay: their abstraction leaks to your app's design, translating WS to... HTTP! In other words, • client makes a WS connection to you; AWS API Gateway handles it for you, saves a connectionId and... makes an HTTP call to your server, passing it this connectionId. • when your server is ready to respond to the WS frame it received through a regular HTTP call, it must, if I remember well, make an HTTP call to API Gateway to pass it the corresponding connectionId and response body, and then API Gateway puts that back onto the WS connection it maintained for you and the client. So your server needs to store-remember this connectionId, and use HTTP protocol instead of WS. There's no proxying that make it appear to your server like if it was really speaking WS. That's where the abstraction leaks. And we pay API Gateway to scale & maintain the WS connections instead of paying for "metal" WS traffic and server instances to support the connections. Of course, we could say: ok, well, let me instead spin up a WS server on my backend and configure the firewall to let pass direct WS traffic in and out. This would amount to spinning up a web server in my app (which doesn't include a web server, just a router - API Gateway provides the auto-scaled load-balanced web server in front of my app's router and HTTP handler(s)). And spinning up a web server inside my app just for the WS part would seem redundant, and then I'd need to monitor the concurrent WS connections and use that as a scaling signal to stand up more server instances, but since I'm in this Cognitect-defined PAAS called Datomic Cloud Ion, I don't like to play it this way - there's a limit to the number of Ion server instances that are configurable out of the box. Of course, I could consult with Cognitect about this, and they might tell me they'll support me without a flinch. Not sure. We ended up not needing anymore WS sockets and... removed all the code. We might soon need to support semi-realtime messaging, and I wonder if we could translate that to SSE traffic through HTTP to save on so much possible tradeoffs. Dennis, if you're reading - sorry to have hijacked your thread!!!

👀 2
xificurC20:02:31

Thanks for the detailed response! I logged a ticket for this. For local tests we run the code for the 2 peers on a single peer, so we do have this abstracted.

denik17:02:18

@U0514DPR7 nw about hijacking the thread 😄 however, I’m wondering whether SSE is a good fit since photon requires a two-way client/server connection and sse is a one way street from the server. of course the client could issue http requests and the server send events using SSE..

Dustin Getz04:02:18

daniel how many concurrent users does your app serve?

Daniel Jomphe13:02:55

Well... • currently, a few users to a few dozen concurrent users. • when this thing reaches all intended kinds of users in a few years... 10,000 to 50,000 actively concurrent users, for sure, most doing queries. This number isn't coming out of our business analysts, just some rule of thumb, but it should help thinking about what's being discussed here. I've never been able to wrap my head about the practicalities of SSE vs WS. It always felt to me that most uses of WS would be served as well, or more simply, more efficiently and with better developer experience, with SSE, if SSE was well implemented across all use cases. Sadly, SSE's implementations in browsers didn't receive as much attention as WS libraries did. So, I'm not sure if we should really invest in planning for SSE over WS, or not. If both options were available, it might be great.

👀 2