This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2024-02-01
Channels
- # aleph (6)
- # announcements (37)
- # aws (1)
- # beginners (67)
- # calva (9)
- # clerk (5)
- # clj-kondo (3)
- # clojure (19)
- # clojure-europe (40)
- # clojure-nl (1)
- # clojure-norway (36)
- # clojure-uk (5)
- # clojuredesign-podcast (7)
- # clojurescript (28)
- # datomic (9)
- # emacs (8)
- # figwheel-main (4)
- # fulcro (6)
- # hyperfiddle (19)
- # integrant (4)
- # java (9)
- # lsp (131)
- # malli (9)
- # missionary (85)
- # off-topic (13)
- # pathom (3)
- # polylith (11)
- # releases (1)
- # sci (4)
- # shadow-cljs (7)
- # specter (2)
- # vscode (1)
- # xtdb (2)
Am I hitting a bug? (Solved, needed to wrap the completion flow in ap
+ and do the openai call in there via a task.)
I suppose "aws" was supposed to be "paws"? "Paws" would then be the token. open-ai streams are 1 token per event, so it is not missing tokens, but rather something else afterward is cutting out
I think it might be multiple tokens because responses start with a single empty string, which you wouldn't see because of the reduction with str
but it means that the "P" that comes afterwards. So it's not just the first event that's missing. Now I'm thinking I should turn the call to api/create-chat-completion into a task and wrap the returned flow somehow, since completion-flow
is supposed to return a side-effect free thunk?
Open ai streams are always one token per event
You can print each event to verify that
Last time I check it was the case it also the only way to track your token consumption because open AI doesn’t give you the number of token used for completion when you are using streams
You might be correct, though that paws is not a token itself
you can't deduce that from the output sadly, we don't have their tokenizer and we don't know what they do in the back to feed to their model. If you use function calling for example they add more to the context of the query with a typed version of what your functions calls look like
I thought jtokens with the right parameters tokenizes exactly like they do?
Well it says you don’t know exactly what they input and count in the input. That doesn’t mean that tiktoken can’t tokenize exactly like they do
But you are using 3.5, and the parameters for 4 and 4-turbo are already known and can be set when counting tokens
Doesn’t help with your issue though
yeah I meant I don't think the tokenizer for gpt-4 is open source, either way the issue I'm having is not to do with tokenizing 🙂
https://platform.openai.com/tokenizer seems they did open-source that
I can double check because like your article mention it depends what they count but as if I recall for simple chat completion you simply count the whole JSON
But yeah from what I remember knowing tokenizing based on the model is not an issue.
So all I’m saying is I suspect aws to not be a token and therefore if it’s confirmed your issue would be after your parse the events json
I’m actually here because I am considering rewriting my OpenAI pipelines from manifold to missionary 😅
I think missionary is really promising and has been quite the adventure to wrap my head around. Really shows how much we take for granted wrt back-pressure.
yeah I shouldn't have used the word token, but events, it seems a couple of initial events sometimes are missing, suggesting it's already consuming the channel before my flow gets to tap into it.
regarding back-pressure do you have to do anything in particular? with manifold I just threw buffers at the problem
btw you don't have to give chat-completion a system message especially one that doesn't add anything to your prompt, that will save you some tokens if it matters to you
@U09MR0T5Y why didn't you use observe? the openai stream is a non-backpressured object, right?
I briefly considered observe as well, but I don't know if that cleans up the resources that the openai api creates well, might be a good option though, I'm a bit of a missionary noob. I thought since there's an example of using channels with the blocking via
https://github.com/leonoel/missionary/wiki/Creating-flows. I'd just go ahead and use that :)
I suppose you are talking about wkok library? I just make regular http calls to openai with aleph and get a java InputStream
(def get-data (comp println json/decode #(str/replace % "data: " "")))
(def completion-flow
(m/observe
(fn [!]
(openai/stream-chat-completion
{:model :gpt4-turbo}
[{:role "system", :content "You are a helpful assisant."}
{:role "user", :content "Name a cute aspect of the cat."}]
!)
#())))
(m/? (m/reduce (fn [out event]
(case event
"" out
"data: [DONE]" (reduced out)
(get-data event)))
[]
completion-flow))
right now I'm playing with that, but I still use manifold to do stream/consume on the input stream and pass it the !
callback, so next step is to get rid of that and consume the stream directly if that is possible.
probably something pretty much like the code in the wkok lib except that it would use core.async at all and just call the callback instead of putting the message in a channelCool! Yeah I appreciate getting to lowest level possible. That will also make it easier to consume local llm’s if you’d like. But I do think the wkok lib is solid 🙂
So far in those cases I’ve relied on python libraries. But they also have different streaming approaches.
I don't necessarily agree, at least when it comes to how it handles SSE
with the snippet above, using clj-http and this function on the body to process the stream:
(defn consume-event-stream [event-stream !]
(let [input-stream (InputStreamReader. event-stream)
reader (BufferedReader. ^Reader input-stream)]
(while
(try
(let [l (.readLine reader)]
(! l)
l)
(catch IOException e
nil)))))
it's looking good, except that now I get while processing the first event "Can't process event - observer is not ready." because now there is no more back pressure and I need a bufferwhat for?
@U03K8V573EC how is it consumed ? you may just have a missing relieve
somewhere
@U053XQP4S but isn't relieve dropping events?
(defn consume-event-stream [event-stream !]
(let [input-stream (InputStreamReader. event-stream)
reader (BufferedReader. ^Reader input-stream)]
(while
(try
(let [l (.readLine reader)]
(! l)
l)
(catch IOException e
nil)))))
(def completion-flow
(m/observe
(fn [!]
;; just a clj-http/post with :as :stream option
(openai/stream-chat-completion
{:model :gpt4-turbo}
[{:role "system", :content "You are a helpful assisant."}
{:role "user", :content "Name a cute aspect of the cat."}]
!)
#(println :cleanup))))
(defn run []
(m/? (m/reduce (fn [out event]
(case event
"" out
"data: [DONE]" (reduced out)
(do (println event) event)
#_(get-data event)))
[]
completion-flow)))
with println debugging (not sure how else to do it) it fails the second time !
is called in consume event stream
and right after the exception the body in the reducing function runs for the first time
@U053XQP4S could it be that the observer is indeed not ready yet?
I'm failing to understand who is in charge of calling consume-event-stream
in your example
it's called on the body returned by clj-http in this function:
(openai/stream-chat-completion
{:model :gpt4-turbo}
[{:role "system", :content "You are a helpful assisant."}
{:role "user", :content "Name a cute aspect of the cat."}]
!)
(defn stream-chat-completion
[ctx messages cb]
(-> (http-client/post endpoint params)
:body
(consume-event-stream cb)))
yes I guess it never returns
what should be used instead? or should stream-chat-completion be turned into a task or something that returns?
the right pattern with m/observe
is to wrap the blocking loop in a thread
(m/observe
(fn [cb]
(let [t (Thread.
(reify Runnable
(run [_]
(-> (http-client/post endpoint params)
:body
(consume-event-stream cb)))))]
(.start t)
#(.interrupt t))))
here is another solution
(defn event-stream [endpoint params]
(m/ap
(let [event-stream (:body (m/? (m/via m/blk (http-client/post endpoint params))))
reader (BufferedReader. (InputStreamReader. event-stream))]
(try
(loop []
(m/amb (m/? (m/via m/blk (.readLine reader)))
(recur)))
(catch IOException _
(m/amb))))))
nice thank you @U053XQP4S the first one with the threads work, the second one I'm about to try, assuming event-stream is a flow, and this solution doesn't need m/observe
I'm a little bit confused by m/amb, it's referring to the same operator described in SCIP? what's happening in that loop?
m/amb
evaluates its branches sequentially and concatenates the results. It can return more than once - or not at all, e.g. (m/amb)
doesn't return. For each returned value, the evaluation resumes with this value.
In this example, we want to loop over 1. read a line and 2. emit this line, we can define this pattern recursively with loop/recur
how can I stop the recursion when readLine is returning nil?
I did
(m/ap
(let [response (m/? (stream-chat-completion-request ctx messages))
event-stream (:body response)
reader (BufferedReader. (InputStreamReader. event-stream))]
(try
(loop []
(let [l (m/? (m/via m/blk (.readLine reader)))]
(if-not (= l "data: [DONE]")
(m/amb l (recur))
(m/amb))))
(catch IOException _
(m/amb)))))
@U053XQP4S I ended up using this:
(defn chat-completion-flow
([ctx messages]
(chat-completion-flow ctx messages identity))
([ctx messages xf]
(m/eduction
(keep (fn [event]
(openai-utils/get-data event)))
xf
(m/ap
(let [sse-stream ($chat-completion-sse-stream ctx messages)
reader (BufferedReader.
(InputStreamReader. sse-stream))]
(try
(loop []
(let [l (m/? (m/via m/blk (.readLine reader)))]
(case l
"data: [DONE]" (m/amb)
"" (recur)
(m/amb l (recur)))))
(catch IOException _
(m/amb))))))))
One questions that remains for me is how is the backpressure managed there? If the eduction ends up producing values faster than they are consumed, is the BufferedReader taking the backpressure? If it wasn't a BufferedReader what would happen?missionary processes don't over-produce. m/eduction
notifies its consumer it has a value available. It will pull a new value from the m/ap
child only when the consumer transfers the value. There's exceptions like m/observe
which has no control over the underlying resource, other than that missionary flows create backpressured processes
You can read the https://github.com/leonoel/flow if you want to better understand the underlying machinery
Right but since $chat-completion-sse-stream is just an InputStream of Server Sent Events, that means they are accumulating in the BuffereReader if they are not being pulled by the eduction fast enough
when the BufferedReader is full I would expect the network stack to propagate backpressure to the server via TCP flow control
I saw that stream and signal from https://github.com/leonoel/missionary/issues/70 have been implemented but what about replay
?
not yet, I mentioned it because it exists elsewhere but I never really had a use case for it
How does one run a task and return something immediately while the task keeps running? Run the task in a vanilla thread? My use case in case I'm completely missing a better solution is that I do I/O in that task in a http request handler and want to return immedialy to the client, while running the task in the background, the task pushing updates through WS.
It is discouraged, by design, to fire-and-forget a task as you would do with e.g. futures. Instead, use parallel composition to make the supervision tree explicit
(m/join vector
(push-updates ws)
(do-something-else))
When you're done building the supervision tree and you just want to run the whole task, just call it with a pair of callbacks
(def cancel (task #(prn :success %) #(prn :failure %)))
Thanks! I think your second snippet is all I need, I was looking into the API all this time when the answer was in the basic walkthrough:
Asynchronously run a task by invoking it and use continuation functions to process a successful or failing result.
AFAICT that's all I need and it works.So in the doc it is here in the wiki: https://github.com/leonoel/missionary/wiki/Basic-Walkthrough:-Tasks-&-Flows But not in the tutorial Hello Task from the repo: https://github.com/leonoel/missionary/blob/master/doc/tutorials/hello_task.md it's also in the quickstart repo, but the main issue is someone primarily using the repo as a resource (like me) can easily miss out
((my-task)
#(println :done %)
#(println :failed))
This never prints failed if the task fails
((my-task)
#(println :done %)
#(println :failed %))
But this does with the exception
...:failed #error {
:cause Cannot invoke "clojure.lang.IFn.invoke(Object)" because "cr133603_place_18" is null
:via
[{:type java.lang.NullPointerException
:message Cannot invoke "clojure.lang.IFn.invoke(Object)" because "cr133603_place_18" is null...
yeah the fact that the second snippet fails is not an issue, issue was that I didn't know it was failing until I added the %
yes I acknowledge it makes poor developer experience, I've yet to figure out the best way to report errors of that kind
another small thing is I couldn't figure out how to use 'def directly within a sp/ap expression (had to do it in a function) I say it's small because I know it's a weird thing to do but I guess it might affect some other macros as well. I sometime do that for debugging purposes