Using core.async.flow and needing to inject events back into the flow from a proc function with side effects. Is there a better way?
Clojure core.async.flow (let [flow-ref (atom nil) f (flow/create-flow {:procs {:llm {:proc (flow/process #'llm/complete!) :args {:flow-ref flow-ref :config config}} :ui {:proc (flow/process #'ui/ui!) :args {:config config}} :tools {:proc (flow/process #'imt/run-tool!) :args {...}}} :conns [... [[:llm :to-ui] [:ui :from-llm]] [[:llm :to-tools] [:tools :from-llm]] ...})] (reset! flow-ref f) f) Then in llm proc, I have: (defn inject! [{:keys [state channel-path event]}] (let [flow-ref (:flow-ref state)] (when @flow-ref (flow/inject @flow-ref channel-path event)))) Then I invoke, as an http request the complete function, and and as I receive SSE events from Open API API, I inject back into the flow: (inject-in-flow! {:state state :channel-path [:llm :to-ui] :event [[:agents.domain.chat/llm-reply-requested {:id id :mulog/ctx local-context :event-trace event-trace }]]})
All the code is here: https://github.com/DanBunea/TodoMVC-AI-Agent/blob/main/components/agents/src/agents/use_cases/chat/agent_builder.clj
In a project I'm working on, I had a similar challenge with LLM streaming. I went with self-loop ports instead of inject: {:ins {:in "input" self-in "loop continuation"} :outs {self-out "loop back" token-out "stream token" done-out "final"}} [[:llm self-out] [:llm self-in]] [[:llm token-out] [:stream-sink :token]] The proc just returns {self-out next-state} to continue, {token-out token} for each SSE chunk, and {done-out result} when done. A separate sink proc collects tokens. This way the proc doesn't need to know about the flow. jm2c.
https://git.sr.ht/~hiredman/resderelictae/tree/master/item/irc.clj#L399 is an outdated (I think there maybe cleaner ways to do this with newer flow features) example of connecting a proc to an external event source
I think using inject for that is kind of gross
> Then I invoke, as an http request the complete function, and
> and as I receive SSE events from Open API API, I inject back into the flow:
Rather than communicate via inject, communicate via channels. Two ways to do that are
• start an async/thread in the llm proc's init that can talk to the llm proc via channels in ::flow/in-ports, ::flow/out-ports
• have a proc for initiating http requests and streaming http results. The http proc can accept urls via an :in chan and stream results back to a provided [pid inid] .
I will check it, thank you
Hey, this sound quite interesting, the way you're doing it. Thanks for sharing