I went about implementing the OpenAI agent loop in Clojure using core.async. I thought it would be quite simple, but it ended up being a lot less fun than I anticipated. I believe it works now through a boatload of unit tests, but I'm wondering if I overcomplicated things? Putting it here for educational purposes.
(ns tools)
(defn execute-functions
"Executes all function calls and returns them asynchronously on a promise-chan
Args:
- function_calls: maps from OpenAI, containing the following keys:
- call_id: the call ID of the function call that ties it with the function output
- name: the name of the function that we use to identify which function to execute
- arguments: a JSON-serialized string of arguments to pass to the function as an object
- context-config: a map containing pieces of config needed to run the context tools
Returns:
- A promise-chan containing the results of the function calls. The results are returns as a single array, with function calls followed by function results pairwise
"
[function-calls context-config]
;; run all function calls in parallel - some may be blocki
(let [output (async/promise-chan)
result-chans
(map
(fn [call]
(async/thread
[call
(execute-function call context-config)])) ;; potentially blocking function that returns formatted function call output
function-calls)]
;; collect all the results
(async/go
(->> result-chans
async/merge
(async/into [])
async/<!
flatten
(async/>! output)))
output))
(ns conversation)
(defn separate-functions-and-text
"Reads a streaming output from OpenAI and splits the events into text deltas and function calls
The new API can return both in a singe call, so this function accounts by returning two chans.
Args:
- stream: the streaming output from OpenAI
Returns:
- [text-chan, function-call-chan, assistant-chan] channels for text and function calls. The one for text deltas contains a stream of text deltas with a .done event at the end. The one for function calls contains a single array of all function calls. The one for assistant contains a stream of assistant messages, needed for multiple turns and persistence."
[stream]
(let
[text-chan (async/chan)
function-call-chan (async/promise-chan)
assistant-chan (async/promise-chan)]
(async/go-loop
[function-calls []]
(when-let [event (async/<! stream)]
(case (-> event :type keyword)
:response.output_text.delta
(do
(async/>! text-chan event)
(recur function-calls))
:response.output_text.done
(do
(async/>! text-chan event)
(async/>! assistant-chan {:role :assistant :content (:text event)})
(recur function-calls))
:response.output_item.done
(case (-> event :item :type keyword)
:function_call
(recur (conj function-calls (:item event)))
(recur function-calls))
:response.completed
(do
(async/close! stream)
(async/close! text-chan)
(async/close! assistant-chan)
(async/>! function-call-chan function-calls))
(recur function-calls))))
[text-chan function-call-chan assistant-chan]))
(defn process-turn
"Processes a single turn from OpenAI streaming, including function calls
This is done asynchronously, so the client stream is available immediately.
Inputs:
- input: must be a vector with the standard type for OpenAI
- model: a valid OpenAI model
- text: text configuration for structured outputs
- instructions: instructions for the model (PKA as system prompt)
- context-config: a map containing pieces of config needed to run the context tools
Returns:
- a channel that streams text back to the client (many maps)
- a channel containing an error, if one occurred (one or none)
- a channel with important events to persist in the db for the next turn (array).
"
[& {:keys [input model text instructions context-config]
:or {model :gpt-4o-mini
text nil
instructions "You are a helpful assistant."
context-config {}}}]
(let [[client-chan error-chan db-chan] [(async/chan 1024) (async/promise-chan) (async/promise-chan)]]
(async/go-loop
[input input]
(let [response (async/<! (ai/get-response :input input
:model model
:instructions instructions
:text text
:tools tools/tools
:stream true))] ;; this returns a chan containing either an error or a chan with events
(if (instance? Throwable response)
(do (async/>! error-chan response)
(async/>! db-chan [])
(async/close! client-chan))
(let [[text-chan function-call-chan assistant-chan]
(separate-functions-and-text response)]
(async/pipe text-chan client-chan false) ;; see text before waiting for end of stream
(if-let [function-calls (seq (async/<! function-call-chan))]
(recur
(concat input
(async/<! (tools/execute-functions function-calls context-config))
(filter some? [(async/<! assistant-chan)])))
(do
(async/close! error-chan)
(async/close! client-chan)
(async/>! db-chan (concat input
(filter some? [(async/<! assistant-chan)])))))))))
[client-chan error-chan db-chan]))It’s not blocking. It immediately produces a channel. The channel can have either a single error or a single channel that contains the values streamed from OAI
Also > I just skimmed the code for 30 seconds so I can’t comment on whether it’s more complicated than needed. Doesn't that say all it needs to say? It was too complicated to grok in 30 seconds?
Haha, well it usually takes me more than 30 seconds to “grok” any piece of code of more than 5-10 lines… even if it’s sufficiently simple 🙂 I am not an LLM 😂
I just skimmed the code for 30 seconds so I can’t comment on whether it’s more complicated than needed.
One thing that I did notice is:
(ai/get-response…)
If that is a blocking IO call, it can cause troubles/deadlocks since it’s inside a (go …) depending on your JVM version, way of using core.async, etc, etc, etc…
If that’s all managed/taken care of wherever the get-response call goes, disregard 🙂