core-async

2026-02-22T07:14:56.640779Z

Using core.async.flow and needing to inject events back into the flow from a proc function with side effects. Is there a better way?

2026-02-22T07:15:05.584989Z

Clojure core.async.flow (let [flow-ref (atom nil) f (flow/create-flow {:procs {:llm {:proc (flow/process #'llm/complete!) :args {:flow-ref flow-ref :config config}} :ui {:proc (flow/process #'ui/ui!) :args {:config config}} :tools {:proc (flow/process #'imt/run-tool!) :args {...}}} :conns [... [[:llm :to-ui] [:ui :from-llm]] [[:llm :to-tools] [:tools :from-llm]] ...})] (reset! flow-ref f) f) Then in llm proc, I have: (defn inject! [{:keys [state channel-path event]}] (let [flow-ref (:flow-ref state)] (when @flow-ref (flow/inject @flow-ref channel-path event)))) Then I invoke, as an http request the complete function, and and as I receive SSE events from Open API API, I inject back into the flow: (inject-in-flow! {:state state :channel-path [:llm :to-ui] :event [[:agents.domain.chat/llm-reply-requested {:id id :mulog/ctx local-context :event-trace event-trace }]]})

serefayar 2026-04-13T20:11:43.384339Z

In a project I'm working on, I had a similar challenge with LLM streaming. I went with self-loop ports instead of inject: {:ins {:in "input" self-in "loop continuation"} :outs {self-out "loop back" token-out "stream token" done-out "final"}} [[:llm self-out] [:llm self-in]] [[:llm token-out] [:stream-sink :token]] The proc just returns {self-out next-state} to continue, {token-out token} for each SSE chunk, and {done-out result} when done. A separate sink proc collects tokens. This way the proc doesn't need to know about the flow. jm2c.

2026-02-22T15:52:32.825759Z

https://git.sr.ht/~hiredman/resderelictae/tree/master/item/irc.clj#L399 is an outdated (I think there maybe cleaner ways to do this with newer flow features) example of connecting a proc to an external event source

2026-02-22T15:52:52.925579Z

I think using inject for that is kind of gross

phronmophobic 2026-02-22T16:34:11.941219Z

> Then I invoke, as an http request the complete function, and > and as I receive SSE events from Open API API, I inject back into the flow: Rather than communicate via inject, communicate via channels. Two ways to do that are • start an async/thread in the llm proc's init that can talk to the llm proc via channels in ::flow/in-ports, ::flow/out-ports • have a proc for initiating http requests and streaming http results. The http proc can accept urls via an :in chan and stream results back to a provided [pid inid] .

👍 1
2026-02-22T17:17:48.377399Z

I will check it, thank you

2026-04-14T18:16:12.306469Z

Hey, this sound quite interesting, the way you're doing it. Thanks for sharing