core-async

Nim Sadeh 2025-04-10T02:17:59.415049Z

I went about implementing the OpenAI agent loop in Clojure using core.async. I thought it would be quite simple, but it ended up being a lot less fun than I anticipated. I believe it works now through a boatload of unit tests, but I'm wondering if I overcomplicated things? Putting it here for educational purposes.

(ns tools)

(defn execute-functions
  "Executes all function calls and returns them asynchronously on a promise-chan

   Args:
   - function_calls: maps from OpenAI, containing the following keys:
     - call_id: the call ID of the function call that ties it with the function output
     - name: the name of the function that we use to identify which function to execute
     - arguments: a JSON-serialized string of arguments to pass to the function as an object
   - context-config: a map containing pieces of config needed to run the context tools

   Returns:
   - A promise-chan containing the results of the function calls. The results are returns as a single array, with function calls followed by function results pairwise
   "
  [function-calls context-config]
  ;; run all function calls in parallel - some may be blocki
  (let [output (async/promise-chan)
        result-chans
        (map
         (fn [call]
           (async/thread
            [call
             (execute-function call context-config)])) ;; potentially blocking function that returns formatted function call output
         function-calls)]

    ;; collect all the results
    (async/go
      (->> result-chans
           async/merge
           (async/into [])
           async/<!
           flatten
           (async/>! output)))
    output))

(ns conversation)

(defn separate-functions-and-text
  "Reads a streaming output from OpenAI and splits the events into text deltas and function calls

   The new API can return both in a singe call, so this function accounts by returning two chans.

   Args:
   - stream: the streaming output from OpenAI

   Returns:
   - [text-chan, function-call-chan, assistant-chan] channels for text and function calls. The one for text deltas contains a stream of text deltas with a .done event at the end. The one for function calls contains a single array of all function calls. The one for assistant contains a stream of assistant messages, needed for multiple turns and persistence."
  [stream]
  (let
   [text-chan (async/chan)
    function-call-chan (async/promise-chan)
    assistant-chan (async/promise-chan)]
    (async/go-loop
     [function-calls []]

      (when-let [event (async/<! stream)]
        (case (-> event :type keyword)

          :response.output_text.delta
          (do
            (async/>! text-chan event)
            (recur function-calls))

          :response.output_text.done
          (do
            (async/>! text-chan event)
            (async/>! assistant-chan {:role :assistant :content (:text event)})
            (recur function-calls))

          :response.output_item.done
          (case (-> event :item :type keyword)
            :function_call
            (recur (conj function-calls (:item event)))

            (recur function-calls))

          :response.completed
          (do
            (async/close! stream)
            (async/close! text-chan)
            (async/close! assistant-chan)
            (async/>! function-call-chan function-calls))

          (recur function-calls))))
    [text-chan function-call-chan assistant-chan]))

(defn process-turn
  "Processes a single turn from OpenAI streaming, including function calls

   This is done asynchronously, so the client stream is available immediately.

   Inputs:
   - input: must be a vector with the standard type for OpenAI
   - model: a valid OpenAI model
   - text: text configuration for structured outputs
   - instructions: instructions for the model (PKA as system prompt)
   - context-config: a map containing pieces of config needed to run the context tools

   Returns:
   - a channel that streams text back to the client (many maps)
   - a channel containing an error, if one occurred (one or none)
   - a channel with important events to persist in the db for the next turn (array).
   "
  [& {:keys [input model text instructions context-config]
      :or  {model :gpt-4o-mini
            text nil
            instructions "You are a helpful assistant."
            context-config {}}}]
  (let [[client-chan error-chan db-chan] [(async/chan 1024) (async/promise-chan) (async/promise-chan)]]
    (async/go-loop
     [input input]
      (let [response (async/<! (ai/get-response :input input
                                                :model model
                                                :instructions instructions
                                                :text text
                                                :tools tools/tools
                                                :stream true))] ;; this returns a chan containing either an error or a chan with events
        (if (instance? Throwable response)
          (do (async/>! error-chan response)
              (async/>! db-chan [])
              (async/close! client-chan))
          (let [[text-chan function-call-chan assistant-chan]
                (separate-functions-and-text response)]
            (async/pipe text-chan client-chan false) ;; see text before waiting for end of stream
            (if-let [function-calls (seq (async/<! function-call-chan))]
              (recur
               (concat input
                       (async/<! (tools/execute-functions function-calls context-config))
                       (filter some? [(async/<! assistant-chan)])))
              (do
                (async/close! error-chan)
                (async/close! client-chan)
                (async/>! db-chan (concat input
                                          (filter some? [(async/<! assistant-chan)])))))))))
    [client-chan error-chan db-chan]))

Nim Sadeh 2025-04-10T11:29:23.636469Z

It’s not blocking. It immediately produces a channel. The channel can have either a single error or a single channel that contains the values streamed from OAI

👌 1
Nim Sadeh 2025-04-10T14:29:59.669589Z

Also > I just skimmed the code for 30 seconds so I can’t comment on whether it’s more complicated than needed. Doesn't that say all it needs to say? It was too complicated to grok in 30 seconds?

raspasov 2025-04-10T17:08:28.267399Z

Haha, well it usually takes me more than 30 seconds to “grok” any piece of code of more than 5-10 lines… even if it’s sufficiently simple 🙂 I am not an LLM 😂

raspasov 2025-04-10T05:03:15.955169Z

I just skimmed the code for 30 seconds so I can’t comment on whether it’s more complicated than needed. One thing that I did notice is: (ai/get-response…) If that is a blocking IO call, it can cause troubles/deadlocks since it’s inside a (go …) depending on your JVM version, way of using core.async, etc, etc, etc… If that’s all managed/taken care of wherever the get-response call goes, disregard 🙂