Fork me on GitHub
#missionary
<
2024-02-01
>
bbss06:02:37

Am I hitting a bug? (Solved, needed to wrap the completion flow in ap + and do the openai call in there via a task.)

Eric Dvorsak07:02:22

I suppose "aws" was supposed to be "paws"? "Paws" would then be the token. open-ai streams are 1 token per event, so it is not missing tokens, but rather something else afterward is cutting out

bbss07:02:27

I think it might be multiple tokens because responses start with a single empty string, which you wouldn't see because of the reduction with str but it means that the "P" that comes afterwards. So it's not just the first event that's missing. Now I'm thinking I should turn the call to api/create-chat-completion into a task and wrap the returned flow somehow, since completion-flow is supposed to return a side-effect free thunk?

Eric Dvorsak08:02:34

Open ai streams are always one token per event

bbss08:02:47

I don't think that's correct, and I don't think Paws would be a single token.

Eric Dvorsak08:02:15

You can print each event to verify that

Eric Dvorsak08:02:03

Last time I check it was the case it also the only way to track your token consumption because open AI doesn’t give you the number of token used for completion when you are using streams

Eric Dvorsak08:02:13

You might be correct, though that paws is not a token itself

bbss08:02:16

you can't deduce that from the output sadly, we don't have their tokenizer and we don't know what they do in the back to feed to their model. If you use function calling for example they add more to the context of the query with a typed version of what your functions calls look like

Eric Dvorsak08:02:04

I thought jtokens with the right parameters tokenizes exactly like they do?

bbss08:02:21

This gets close:

Eric Dvorsak08:02:19

Well it says you don’t know exactly what they input and count in the input. That doesn’t mean that tiktoken can’t tokenize exactly like they do

bbss08:02:26

that, and also gpt-4 uses a different tokenizer

Eric Dvorsak08:02:25

But you are using 3.5, and the parameters for 4 and 4-turbo are already known and can be set when counting tokens

Eric Dvorsak08:02:53

Doesn’t help with your issue though

bbss08:02:00

yeah I meant I don't think the tokenizer for gpt-4 is open source, either way the issue I'm having is not to do with tokenizing 🙂

bbss08:02:06

good to know it can be predicted accurately

Eric Dvorsak08:02:15

I can double check because like your article mention it depends what they count but as if I recall for simple chat completion you simply count the whole JSON

Eric Dvorsak08:02:45

But yeah from what I remember knowing tokenizing based on the model is not an issue.

👍 1
Eric Dvorsak08:02:58

So all I’m saying is I suspect aws to not be a token and therefore if it’s confirmed your issue would be after your parse the events json

Eric Dvorsak08:02:45

I’m actually here because I am considering rewriting my OpenAI pipelines from manifold to missionary 😅

🙂 1
bbss08:02:20

I think missionary is really promising and has been quite the adventure to wrap my head around. Really shows how much we take for granted wrt back-pressure.

bbss08:02:58

yeah I shouldn't have used the word token, but events, it seems a couple of initial events sometimes are missing, suggesting it's already consuming the channel before my flow gets to tap into it.

Eric Dvorsak10:02:07

regarding back-pressure do you have to do anything in particular? with manifold I just threw buffers at the problem

Eric Dvorsak10:02:22

btw you don't have to give chat-completion a system message especially one that doesn't add anything to your prompt, that will save you some tokens if it matters to you

1
bbss10:02:31

I'm aware! Have spent a fair bit of time playing around with LLM's

Eric Dvorsak10:02:01

@U09MR0T5Y why didn't you use observe? the openai stream is a non-backpressured object, right?

bbss11:02:48

I briefly considered observe as well, but I don't know if that cleans up the resources that the openai api creates well, might be a good option though, I'm a bit of a missionary noob. I thought since there's an example of using channels with the blocking via https://github.com/leonoel/missionary/wiki/Creating-flows. I'd just go ahead and use that :)

Eric Dvorsak11:02:37

I suppose you are talking about wkok library? I just make regular http calls to openai with aleph and get a java InputStream

bbss11:02:44

Yes, I am indeed using that lib.

Eric Dvorsak11:02:21

(def get-data (comp println json/decode #(str/replace % "data: " "")))
(def completion-flow
  (m/observe
   (fn [!]
     (openai/stream-chat-completion
      {:model :gpt4-turbo}
      [{:role "system", :content "You are a helpful assisant."}
       {:role "user", :content "Name a cute aspect of the cat."}]
      !)
     #())))

(m/? (m/reduce (fn [out event]
                 (case event
                   "" out
                   "data: [DONE]" (reduced out)
                   (get-data event)))
               []
               completion-flow))
right now I'm playing with that, but I still use manifold to do stream/consume on the input stream and pass it the ! callback, so next step is to get rid of that and consume the stream directly if that is possible. probably something pretty much like the code in the wkok lib except that it would use core.async at all and just call the callback instead of putting the message in a channel

bbss11:02:51

Cool! Yeah I appreciate getting to lowest level possible. That will also make it easier to consume local llm’s if you’d like. But I do think the wkok lib is solid 🙂

bbss11:02:54

So far in those cases I’ve relied on python libraries. But they also have different streaming approaches.

Eric Dvorsak11:02:50

I don't necessarily agree, at least when it comes to how it handles SSE

Eric Dvorsak12:02:35

with the snippet above, using clj-http and this function on the body to process the stream:

(defn consume-event-stream [event-stream !]
  (let [input-stream (InputStreamReader. event-stream)
        reader (BufferedReader. ^Reader input-stream)]
    (while
     (try
       (let [l (.readLine reader)]
         (! l)
         l)
       (catch IOException e
         nil)))))
it's looking good, except that now I get while processing the first event "Can't process event - observer is not ready." because now there is no more back pressure and I need a buffer

bbss12:02:48

You can emit an empty string or dummy event before opening the stream maybe?

bbss12:02:29

I don’t know. Not a missionary expert yet but that’s what I would try wrt that error.

leonoel12:02:20

@U03K8V573EC how is it consumed ? you may just have a missing relieve somewhere

Eric Dvorsak12:02:51

@U053XQP4S but isn't relieve dropping events?

leonoel12:02:32

the arity 1, yes

Eric Dvorsak12:02:04

(defn consume-event-stream [event-stream !]
  (let [input-stream (InputStreamReader. event-stream)
        reader (BufferedReader. ^Reader input-stream)]
    (while
     (try
       (let [l (.readLine reader)]
         (! l)
         l)
       (catch IOException e
         nil)))))

(def completion-flow
  (m/observe
   (fn [!]
     ;; just a clj-http/post with :as :stream option
     (openai/stream-chat-completion
      {:model :gpt4-turbo}
      [{:role "system", :content "You are a helpful assisant."}
       {:role "user", :content "Name a cute aspect of the cat."}]
      !)
     #(println :cleanup))))

(defn run []
  (m/? (m/reduce (fn [out event]
                   (case event
                     "" out
                     "data: [DONE]" (reduced out)
                     (do (println event) event)
                     #_(get-data event)))
                 []
                 completion-flow)))

👀 1
Eric Dvorsak13:02:11

with println debugging (not sure how else to do it) it fails the second time ! is called in consume event stream

Eric Dvorsak13:02:08

and right after the exception the body in the reducing function runs for the first time

Eric Dvorsak13:02:42

@U053XQP4S could it be that the observer is indeed not ready yet?

leonoel13:02:36

I'm failing to understand who is in charge of calling consume-event-stream in your example

Eric Dvorsak13:02:37

it's called on the body returned by clj-http in this function:

(openai/stream-chat-completion
      {:model :gpt4-turbo}
      [{:role "system", :content "You are a helpful assisant."}
       {:role "user", :content "Name a cute aspect of the cat."}]
      !)

leonoel13:02:33

so this function is running the read loop and never returns ?

Eric Dvorsak13:02:24

(defn stream-chat-completion
  [ctx messages cb]  
  (-> (http-client/post endpoint params)
      :body
      (consume-event-stream cb)))

leonoel13:02:58

ok, thanks

Eric Dvorsak13:02:33

yes I guess it never returns

leonoel13:02:51

this is bad usage of m/observe, the boot function is supposed to return immediately

Eric Dvorsak13:02:28

what should be used instead? or should stream-chat-completion be turned into a task or something that returns?

leonoel13:02:07

the right pattern with m/observe is to wrap the blocking loop in a thread

(m/observe
  (fn [cb]
    (let [t (Thread.
              (reify Runnable
                (run [_]
                  (-> (http-client/post endpoint params)
                    :body
                    (consume-event-stream cb)))))]
      (.start t)
      #(.interrupt t))))

leonoel13:02:15

here is another solution

(defn event-stream [endpoint params]
  (m/ap
    (let [event-stream (:body (m/? (m/via m/blk (http-client/post endpoint params))))
          reader (BufferedReader. (InputStreamReader. event-stream))]
      (try
        (loop []
          (m/amb (m/? (m/via m/blk (.readLine reader)))
            (recur)))
        (catch IOException _
          (m/amb))))))

Eric Dvorsak13:02:22

nice thank you @U053XQP4S the first one with the threads work, the second one I'm about to try, assuming event-stream is a flow, and this solution doesn't need m/observe

leonoel13:02:48

yes, m/ap returns a flow and this is an alternative to m/observe

Eric Dvorsak14:02:04

I'm a little bit confused by m/amb, it's referring to the same operator described in SCIP? what's happening in that loop?

leonoel14:02:45

yes, it is conceptually the same operator.

leonoel14:02:00

m/amb evaluates its branches sequentially and concatenates the results. It can return more than once - or not at all, e.g. (m/amb) doesn't return. For each returned value, the evaluation resumes with this value.

leonoel14:02:43

In this example, we want to loop over 1. read a line and 2. emit this line, we can define this pattern recursively with loop/recur

Eric Dvorsak17:02:04

how can I stop the recursion when readLine is returning nil?

Eric Dvorsak17:02:52

I did

(m/ap
    (let [response (m/? (stream-chat-completion-request ctx messages))
          event-stream (:body response)
          reader (BufferedReader. (InputStreamReader. event-stream))]
      (try
        (loop []
          (let [l (m/? (m/via m/blk (.readLine reader)))]
            (if-not (= l "data: [DONE]")
              (m/amb l (recur))
              (m/amb))))
        (catch IOException _
          (m/amb)))))

leonoel17:02:59

that's correct

Eric Dvorsak07:04:25

@U053XQP4S I ended up using this:

(defn chat-completion-flow
  ([ctx messages]
   (chat-completion-flow ctx messages identity))
  ([ctx messages xf]
   (m/eduction
    (keep (fn [event]
            (openai-utils/get-data event)))
    xf
    (m/ap
      (let [sse-stream ($chat-completion-sse-stream ctx messages)
            reader (BufferedReader.
                    (InputStreamReader. sse-stream))]
        (try
          (loop []
            (let [l (m/? (m/via m/blk (.readLine reader)))]
              (case l
                "data: [DONE]" (m/amb)
                "" (recur)
                (m/amb l (recur)))))
          (catch IOException _
            (m/amb))))))))
One questions that remains for me is how is the backpressure managed there? If the eduction ends up producing values faster than they are consumed, is the BufferedReader taking the backpressure? If it wasn't a BufferedReader what would happen?

xificurC08:04:25

missionary processes don't over-produce. m/eduction notifies its consumer it has a value available. It will pull a new value from the m/ap child only when the consumer transfers the value. There's exceptions like m/observe which has no control over the underlying resource, other than that missionary flows create backpressured processes

👍 1
xificurC08:04:29

You can read the https://github.com/leonoel/flow if you want to better understand the underlying machinery

Eric Dvorsak09:04:35

Right but since $chat-completion-sse-stream is just an InputStream of Server Sent Events, that means they are accumulating in the BuffereReader if they are not being pulled by the eduction fast enough

leonoel09:04:33

when the BufferedReader is full I would expect the network stack to propagate backpressure to the server via TCP flow control

Eric Dvorsak19:02:20

I saw that stream and signal from https://github.com/leonoel/missionary/issues/70 have been implemented but what about replay ?

leonoel19:02:28

not yet, I mentioned it because it exists elsewhere but I never really had a use case for it

Eric Dvorsak23:02:27

How does one run a task and return something immediately while the task keeps running? Run the task in a vanilla thread? My use case in case I'm completely missing a better solution is that I do I/O in that task in a http request handler and want to return immedialy to the client, while running the task in the background, the task pushing updates through WS.

leonoel08:02:37

It is discouraged, by design, to fire-and-forget a task as you would do with e.g. futures. Instead, use parallel composition to make the supervision tree explicit

(m/join vector
  (push-updates ws)
  (do-something-else))
When you're done building the supervision tree and you just want to run the whole task, just call it with a pair of callbacks
(def cancel (task #(prn :success %) #(prn :failure %)))

Eric Dvorsak09:02:45

Thanks! I think your second snippet is all I need, I was looking into the API all this time when the answer was in the basic walkthrough:

Asynchronously run a task by invoking it and use continuation functions to process a successful or failing result.
AFAICT that's all I need and it works.

Eric Dvorsak09:02:13

So in the doc it is here in the wiki: https://github.com/leonoel/missionary/wiki/Basic-Walkthrough:-Tasks-&amp;-Flows But not in the tutorial Hello Task from the repo: https://github.com/leonoel/missionary/blob/master/doc/tutorials/hello_task.md it's also in the quickstart repo, but the main issue is someone primarily using the repo as a resource (like me) can easily miss out

Eric Dvorsak09:02:52

@U053XQP4S

((my-task)
   #(println :done %)
   #(println :failed))
This never prints failed if the task fails
((my-task)
   #(println :done %)
   #(println :failed %))
But this does with the exception
...:failed #error {
 :cause Cannot invoke "clojure.lang.IFn.invoke(Object)" because "cr133603_place_18" is null
 :via
 [{:type java.lang.NullPointerException
   :message Cannot invoke "clojure.lang.IFn.invoke(Object)" because "cr133603_place_18" is null...

leonoel10:02:13

task callbacks must implement arity 1, first snippet is UB

leonoel10:02:00

undefined behavior

leonoel10:02:46

why the second snippet fails, hard to say without implementation

Eric Dvorsak10:02:38

yeah the fact that the second snippet fails is not an issue, issue was that I didn't know it was failing until I added the %

leonoel10:02:45

yes I acknowledge it makes poor developer experience, I've yet to figure out the best way to report errors of that kind

Eric Dvorsak10:02:36

another small thing is I couldn't figure out how to use 'def directly within a sp/ap expression (had to do it in a function) I say it's small because I know it's a weird thing to do but I guess it might affect some other macros as well. I sometime do that for debugging purposes