Oh shiny: https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/flow.clj! > A library for building concurrent, event driven data processing > flows out of communication-free functions, while centralizing > control, reporting, execution and error handling. Built on core.async.
Cycles are possible (on you to avoid infinite loops and deadlock troubles)
The uniqueness across ins/outs within a proc is so that we get a uniform coordinate system for talking about channels - [pid cid], otherwise you'd have to have [pid :in-or-out cid] In any case cycles are effected by making entries in the :conns, not by name sharing, so [[:p :out] [:p :in]] is a direct cycle.
I'm interested in more concrete dynamic flow use cases than just for emulating Erlang/OTP. This isn't actors, and its 'all interactions are potential cross-proc wire interactions with potential cross-proc wire failures' doesn't align with an in-proc system like this. If your proc is failing its because either - a) there's a normal situation that you haven't considered such, thus is currently exceptional or b) you have a bug you need to fix. You will be able to see your failures in one place and stop the flow if that makes sense to your app. I'm not going to introduce the 'just fail and vanish' semantics of Erlang here.
I am thinking about cases like a network server having a process per connection. Which is sort of cloning / forking, each new process is identical accept for its start data. It might be doable to wire something up using ProcessLauncher that does this already though: a process that makes pubs out of its inputs and launches a new instance of some example process
An example use case for dynamic flows is a video player where you can add/remove filters while the video is playing.
A second use case for dynamic data flows is for live coding data dashboards. For example, https://www.datarabbit.com/ uses a similar library to core.async.flow called https://github.com/ryrobes/flowmaps/. In principle, the data feeds for these dashboards aren't live, so you could rebuild the flow and replay, but it's useful to build the data flow as you go while inspecting and building on intermediate results.
I've looked into https://github.com/ryrobes/flowmaps/ previously and ended up not using them because 1) It only seems to support triggering processes when all inputs are available 2) flowmaps does a bunch of extra stuff I don't care about.
I worked on a trading strategy recently where the price of one particular asset dictated some number of other assets' price feeds I had to be subscribed to (so it would need to be subscribing and unsubscribing semi-regularly).
I definitely consider the interactive flow editing case out of scope
@hiredman in the network server scenario in what way do the per-connection processes form/join a single flow, i.e. different from just spinning up a new flow per?
you still have core.async with which to dynamically connect different flows, and do pub/sub etc
some of the core aync primitives might become (fixed, logic-free) utility processes
e.g. pub/sub, mult/tap, pipeline
incrementally mutating a flow to wire something in is a great way to make a mess and engender a whole bunch more failure modes, that's why I'm loathe to just go there
I got as far as the idea of a patch, a new sub-flow containing procs and conns, some of which conns can refer to the existing flow.
similarly needs 'cleave'
definitely not add/remove proc/conn
disconnecting cleanly much tougher than joining
My mental model of flows is heavily influenced by playing too much factorio. I understand that modifying flows can easily to a mess, but I can't think of what new error modes there would be.
there are some efficiencies (e.g. the direct wiring that occurs for 1:1 connections) you'd have to give up in order to be ready for dynamic modification without having the mutability seep everywhere
> the direct wiring that occurs for 1:1 connections I didn't know flows would do that. That's neat.
new error mode - you're blocked writing to someone who's gone
again, adding overhead you can get some protection
e.g. all connections become pipes or taps
all outs are reified chans (they aren't now per above)
anyway, I've thought about it and it's not ruled out. Thanks all for the input
I guess I'm mostly thinking of dynamic flows in the context of development where missing connections and processes isn't a problem. Ignoring any implementation details, it seems like writing to a missing process is the same as writing to a connection where processes are blocked and aren't consuming values.
except you expect the latter to clear and the former can't ever
gotta run
> except you expect the latter to clear and the former can't ever That's not strictly true in the dynamic case since you can add a new process in the future that does start reading from the connection.
Maybe itโs interesting to somebody: I converted my entire real-time AI calls library to the core. async.flow, and it is just awesome!
Pros:
1. Itโs faster than my previous implementation (mainly because every processor has his own thread)
2. Changing from the pipeline abstraction to a graph simplifies things a lot
3. The entire graph is described as pure functions that interact with each other. This makes me feel so much more relaxed, knowing that everything can be individually tested and further developed ๐
Thank you, @richhickey, for core.async.flow - it now has an intuitive design, and you can describe complex interactions as flows.
Hereโs the PR for anybody interested: https://github.com/shipclojure/voice-fn/pull/1
did you see the datafy support, as well as the new ping returns data behavior?
I guess you want lint-time, but still are creating procs, side effects of which will run
we have discussed using metadata for describe data, that would be compile/lint-time clear of effects, but has other tradeoffs
tbd if user commands can modify state, if so you wonโt need alter-state
not that a proc ctor should have any effects, that should all wait for init
Ah, didn't occur to me to look at datafy/nav since I had the flow defining map in hand. But I guess with datafy I would be able to do the lint thing on a flow object, not just the definition map
yeah, a created but un-started flow should be as effect-free as an un-inited proc
::flow/pid now passed with args
flow config maps might be created by programs and not evident in source
a few things that might be useful for playing with flow
(require '[clojure.set :as set])
(defn lint-connections
"Given a map that defines a flow, return a map that lists ins and outs
of processes that are not connnect and connections that mention
non-existant ins and outs"
[flow-def]
(let [ins (set (for [[pid {:keys [proc]}] (:procs flow-def)
[in-port _](:ins (spi/describe proc))]
[pid in-port]))
outs (set (for [[pid {:keys [proc]}] (:procs flow-def)
[out-port _](:outs (spi/describe proc))]
[pid out-port]))
connected-outs (set (map first (:conns flow-def)))
connected-ins (set (map second (:conns flow-def)))]
{:unconnected-ins (set/difference ins connected-ins)
:unconnected-outs (set/difference outs connected-outs)
:non-existant-ins (set/difference connected-ins ins)
:non-existant-outs (set/difference connected-outs outs)}))
(defn wrap-alter-state
"Add an input for a step process to enable sending functions to
manipulate its state."
[f]
(fn
([]
(assoc-in (f) [:ins ::alter-state] ""))
([args]
(f args))
([state transition]
(f state transition))
([state in message]
(if (= in ::alter-state)
[(apply (::fn message) state (::args message))]
(f state in message)))))
I have (tap> (lint-connections flow-def)) as a top level form in my source file, so I get that linting info every time I reload the file. wrap-alter-state is new, the intent is to allow for poking at process state from the repl using inject
dynamic read subsets are in now, also a streamlined way to create sources and sinks via 'ports'
thus introduce is gone
generating docs here atm https://clojure.github.io/core.async/clojure.core.async.flow.html
Yes! This solves my issue! Thank you, Rich!
@richhickey wrote this on reddit too but I think I better put it here. Last 2 days I tested flow to integrate it in https://github.com/shipclojure/voice-fn
Iโm trying to replciate a more realistic usecase and I canโt figure out how to send data down the pipeline when the data is received async (either through callbacks or go/loop). Currently the examples show that the function should return [state, [chan msg]] but this canโt happen in an async context.
Example:
I have a :proc that when started, creates a websocket connection. It sends to the ws connection all the input โpacketsโ it receives from itโs in and when it receives back events from ws, the processor should send them further down the pipeline.
My current attempt:
:deepgram-transcriptor
{:proc
(flow/process
{:describe
(fn [] {:ins {:sys-in "Channel for system messages that take priority"
:in "Channel for audio input frames (from transport-in) "}
:outs {:sys-out "Channel for system messages that have priority"
:out "Channel on which transcription frames are put"}
:params {:deepgram/api-key "Api key for deepgram" }
:workload :io})
:init
(fn [args]
(let [websocket-url (deepgram/make-websocket-url args)
conn-config {:headers {"Authorization" (str "Token " (:deepgram/api-key args))}
:on-open (fn [ws]
(t/log! :info "Deepgram websocket connection open"))
:on-message (fn [_ws ^HeapCharBuffer data _last?]
(let [m (u/parse-if-json (str data))]
(cond
(deepgram/speech-started-event? m)
;; (send-frame! pipeline (frame/user-speech-start true))
(prn "Send speech started frame down the pipeline")
(deepgram/utterance-end-event? m)
;; (send-frame! pipeline (frame/user-speech-stop true))
(prn "Send speech stopped frame down the pipeline")
(deepgram/final-transcript? m)
;; (send-frame! pipeline (frame/transcription trsc))
(prn "send transcription frame down the pipeline")
(deepgram/interim-transcript? m)
;; (send-frame! pipeline (send-frame! pipeline (frame/transcription-interim trsc)))
(prn "send interim transcription frame down the pipeline"))))
:on-error (fn [_ e]
(t/log! {:level :error :id :deepgram-transcriptor} ["Error" e]))
:on-close (fn [_ws code reason]
(t/log! {:level :info :id :deepgram-transcriptor} ["Deepgram websocket connection closed" "Code:" code "Reason:" reason]))}
_ (t/log! {:level :info :id :deepgram-transcriptor} "Connecting to transcription websocket")
ws-conn @(ws/websocket websocket-url conn-config)]
{:websocket/conn ws-conn}))
;; Close ws when pipeline stops
:transition (fn [{:websocket/keys [conn] :as state} transition]
(if (and (= transition ::flow/stop)
conn)
(do
(t/log! {:id :deepgram-transcriptor :level :info} "Closing transcription websocket connection")
(ws/send! conn deepgram/close-connection-payload)
(ws/close! conn)
{})
state))
:transform (fn [{:websocket/keys [conn]} in-name frame]
(cond
(frame/audio-input-raw? frame)
(when conn (ws/send! (:frame/data frame)))))})}
How would I do the sending of events from the :on-message callback in the websocket config in the flow abstraction?
Full example here:
https://github.com/shipclojure/voice-fn/blob/exp/confert-to-core-async-flow/core/src/voice_fn/experiments/flow.clj@ovidiu.stoica1094 Not totally sure what the intended setup is, but my guess is that you're doing too much in a single process. I would break the process down into simpler pieces:
โข An :inject process that injects new messages from the websocket
โข An output process that writes to the websocket
โข various intermediate processes
I think reading from the websocket is the only async operation? The advice for how to read is from https://github.com/clojure/core.async/commit/03b97e0b3e0ec329629bcbf76106658dce4a5d61#diff-4df867a8cd7775659f653f7a1280ca7356172e7896d4bb8efeddd7109d0f4eb4R180. Not totally sure what API you're using, but you can probably do blocking reads with timeouts in the inject function.
Inject appears to be more of a polling input
There doesn't appear to be a good way to have a process that reads from a channel outside of the flow, other than polling it, which doesn't seem to be ideal?
I've continued to fiddle with a custom process launcher to allow for sort of cloning of processes per connection or whatever, I've got something now that runs, and responds to process pings and forks as needed, but deadlocks when one of the forked processes exits (this should all be wired with pubs and such, so messages going to a non-existent destination shouldn't be a problem, but evidently needs debugging)
> Inject appears to be more of a polling input I'm not sure, but it seems like you can poll or block in the inject function. You just shouldn't block indefinitely. My guess is that you block, but yield every once in a while for control messages and then the inject function gets called again.
The flow per connection approach would just put off the issue, before I had a bunch of processes (go loops reading writing channels) to manage and now I have a bunch flows, and even kind of worse because it is not immediately obvious how to have flows cross talk (an inject with polling on a channel sort of works for getting messages into a flow, but a sink with polling offer with some kind of internal buffer just seems horrendous)
Yea, I dunno. My intuition would be to generally not have a flow per connection, especially if the different connections talk to each other.
Exactly
@ovidiu.stoica1094's example code appears to be running into two issues, 1. Is the same lack of a clear way to connect an external async source or sink to a flow and 2. Is processes in a flow don't have a way to access the flow as a whole. Those two issues together means it is tricky for the callbacks given to the websocket client connection created in a flow to communicate data back to the flow.
If you implement process launcher yourself you have direct access to the channels, so you can more easily do async stuff, that might be the answer
That's not how I would approach it. I would have a function that returns subflows that have clear inputs and outputs. I would then compose the subflows into a single larger flow. The idea being that you could easily replace the websocket source with another source. You could replace the sinks with other sinks.
I mean, if you have a process definition that is a custom process launcher that launches a websocket and wires it's ins and outs to the channels, you can just replace that process definition with a different one in a flow
process and step-process lift procedures (fns that are not pure but are not async) into processes (things with state that communicate over channels)
A websocket doesn't really need that lifting
A websocket is actually two processes, one for reading and one for writing. You can also decouple the websocket specific reading and writing from the rest of the logic.
Obviously, you can also smash all that together.
I mean it is smashed together, that is a socket
A process is a thing with some state that communicates over input and output channels, a socket is a ...
Right, you can even continue smashing more and more stuff together, but I find it useful to treat as connection as two processes, a source and a sink.
A process is a thing with some state that communicates over input and output channelsYou could use this definition to describe any arbitrarily large program, but that doesn't mean you should.
Reminds me, the reply to stuff, processes created with process or step-process don't have access to their own pid in flow to allow for including it in messages for reply to
Sockets, as an abstraction that already exists(I didn't smash them together), are a process
The reading and writing of a socket aren't coupled together. Is there a problem with keeping the reading and writing of a socket decoupled?
Unclear what decoupling means, Socket as a thing packages state and an input and an output
The question is how to connect one to a flow
Logically it has all the same parts as a process, so you can model it as one in a flow
That doesn't stop you from then having separate processes in the flow from reading and writing
There is also some ambiguity about "process" since it is common to refer to go loops or threads on the abstract sense as processes, and flows bring their own concrete notion of a process
So you can have a flow process that is internally two threads reading and writing data to a socket
So when I was using the word process, I was referring to reading and writing having their own proc definition.
> So you can have a flow process that is internally two threads reading and writing data to a socket My idea would be to avoid managing my own threads and use the available core.async.flow tools manage all of the threads for me.
What is the benefit? Process inputs and outputs are already segregated and addressable separately in :conns
I think not managing my own threads is useful. It allows me to delegate and make policy decisions about threads at a higher level.
If I do want to manage all my own threads, I'll just use core.async directly.
splitting the input and output of a socket isn't going to prevent you from needing a thread to do the reading and writing
Right, but it allows me to tell core.async.flow that I have two I/O procs and let it manage the threads for me.
I also get error handling and reporting for reading and writing separately.
that is interesting, I haven't dug much into the different ways io, compute ,and mixed procs are executed
I've build a few prototypes of factorio style flow based programming and it's kind of fun. Not sure how much factorio you've played, but it's kind of neat replace mining equipment -> conveyer belts -> assembly machines with server socket -> channels -> (network output, send emails, write to files).
I have a little chat server I've been using to prototype my process cloning thing
I unfortunately chose to use nio, so a mess
The dynamic, user interactive use cases seem like they might be out of scope for core.async.flow, but I think many of the ideas would still apply.
Having played a bunch of factorio and made factorio-like prototypes, I don't think the dynamic case is that different from the static case, but I still haven't fully dug into the details yet.
yeah, we'll see, those have just been my primary use case for core.async in the past
(chat systems modeling lots of different parts of it as processes and that come and go)
I would break the process down into simpler pieces:> An :inject process that injects new messages from the websocket
> An output process that writes to the websocket
> various intermediate processes
:inject has been renamed to :introduce
Thank you for the reply. Here are my thougts:
I donโt like this approach. The example I showed above is a small part of a bigger pipeline for realtime voice AI, and most of those processes are of this nature. The flow is:
audio in -> Transcription (speech to text) -> LLM processing text -> Sentence assembler of LLM text chunk tokens -> Text to speech processor -> Audio out
As you can tell, most of those processes receive input, communicate with an async external process, and send output (in an async manner) further down the pipeline.
Letโs analyze your proposed solution: Splitting the async processes into 2 processors, 1 for reading and 1 for writing. Letโs say we use the first one for initialization:
(let [state (atom {})
gdef
{:procs {:write-transcription
(flow/process
{:describe {:ins {:in "Channel for audio input frames (from transport-in) "}
:params {:api-key "Api key for deepgram"}
:workload :io}
:init (fn [{:keys [api-key]}]
(let [ws-output-chan (a/chan 100) ;; ws output will be put on this channel
ws (init-conn! api-key ws-output-chan)]
(swap! state assoc :ws ws :ws-out-c ws-output-chan)
{:ws ws}))
:transform (fn [{:keys [ws] :as state} _ frame]
(when (and (frame/audio-input-raw? frame) ws)
(ws/send! ws (:frame/data frame)))
[state nil])})
:read-transcription
(flow/process
{:describe {:outs {:out "Channel for transcription frames (from websocket)"}
:workload :io}
:introduce
(fn [_]
(when-let [c (:ws-out-c @state)]
(let [event (a/poll! c)]
(cond
(speech-started-event? event)
[nil {:out [(frame/user-speech-start true)]}]
(utterance-end-event? event)
[nil {:out [(frame/user-speech-stop true)]}]
(transcription? event)
[nil {:out [(frame/transcription (:transcript event))]}]))))})}}])
> Note: You need to introduce a global state. Initially, I wanted to send the ws-conn through channels between the reader and writer. I suppose this can be changed so that the reader (with`:introduce` ) can be the one initializing the websocket and sending it via chan to the writer)
This should work. However, most of the processes in my use case are async. Some of them (like the LLM processor will send an API request based on the input instead of a websocket connection, but it is still the same concept. You have to sync them with global state. To achieve this using flows, all my processors should be split and given the async nature of the processors themselves, testing isnโt made easier in this abstraction.
What I do like about it is the notion of a graph and not a pipeline.
I see 2 options to support async use cases like this:
1. Create a spi/ProcLauncher that permits processes with an introduce function to have inputs.
2. Create a spi/ProcLauncher that gives processes access to the ins and outs channels within the :init or :transform functions or maybe create a new function called transform-async which gives direct access to the channels
What do you think?The above code does not represent the solution I was proposing. > You need to introduce a global state. Do you? I would definitely try to avoid making my own channels and global state. I would also decouple "what to send" from "how to send it".
Can you provide a small example of what you were proposing?
Yea, I realize my proposal is a bit fuzzy without any actual code. I would like to sit down and try core.async.flow more seriously at some point.
Yeah, exciting stuff
to copy what i said on bluesky: lol 700 lines of code, no tests, no examples, committed directly to master. he's done it again, lads
the code is not the primary artifact here
it's just the first thing you've seen publicly
most of the 700LOC is docstring
the docstring for clojure.core.async.flow/start state that it returns qualified keywords such as ::flow/report-chan when in fact unqualified keys like :report-chan are returned.
(I know this wasn't presented as ready to consume yet and I'm grateful for the early peek at it!)
I think you can sort of break flows down into three parts. 1. Process Control 2. Process Connection 3. Process creation
The big omission in my mind is adding and removing processes from a flow while it runs. I think there are a lot of erlang supervisor tree style approaches that need something like that.
The primary means of process creation, by lifting a function into a process, is a nice pattern to have more formalized. A next step for that kind of thing can often be to drop core.async out all together (I've done stuff starting with processes wired together with channels, realized I could do the functions lifted into processes thing to remove boilerplate, then realized the functions can be directly composed without the channels)
I haven't dug too much into the process connection bits, but it is interesting that it isn't just a sort of shared pubsub bus + control channel, it appears to be a lot more general purpose then that
;;=============== play with flow ==============
(require '[clojure.core.async :as async]
'[clojure.core.async.flow :as flow]
'[clojure.pprint :as pp])
(defn monitoring [{:keys [report-chan error-chan]}]
(prn "========= monitoring start")
(async/thread
(loop []
(let [[val port] (async/alts!! [report-chan error-chan])]
(if (nil? val)
(prn "========= monitoring shutdown")
(do
(prn (str "======== message from " (if (= port error-chan) :error-chan :report-chan)))
(pp/pprint val)
(recur))))))
nil)
(defn ddupe
([] {:ins {:in "stuff"}
:outs {:out "stuff w/o consecutive dupes"}})
([_] {:last nil})
([{:keys [last]} _ v]
[{:last v} (when (not= last v) {:out [v]})]))
(def gdef
{:procs
{
:dice-source
{:proc (flow/process
{:describe (fn [] {:outs {:out "roll the dice!"}})
:inject (fn [_]
(Thread/sleep 200)
[nil {:out [[(inc (rand-int 6)) (inc (rand-int 6))]]}])})}
:craps-finder
{:proc (-> #(when (#{2 3 12} (apply + %)) %) flow/lift1->step flow/step-process)}
:dedupe
{:proc (flow/step-process #'ddupe)}
:prn-sink
{:proc (flow/process
{:describe (fn [] {:ins {:in "gimme stuff to print!"}})
:transform (fn [_ _ v] (prn v))})}}
:conns
[
[[:dice-source :out] [:dedupe :in]]
[[:dedupe :out] [:craps-finder :in]]
[[:craps-finder :out] [:prn-sink :in]]
]})
(def g (flow/create-flow gdef))
(monitoring (flow/start g))
(flow/resume g) ;;wait a bit for craps to print
(flow/pause g)
(flow/inject g [:craps-finder :in] [[1 2] [2 1] [2 1] [6 6] [6 6] [4 3] [1 1]])
(flow/ping g)
(flow/stop g)
I'll try to keep that working as I tweak the naming
there are a lot of design docs, much more important to me than tests at this stage, but there will be plenty of tests
this is very exciting
dynamic modification is still TBD - it's pretty easy to get a gross mutable monster. You won't need it for hot reload, reply-to or parallelization
Retirement sounds pretty sweet!
I didn't realize there was a thread here when I commented on reddit, so hopefully this question isn't redundant. I don't actually know how to use reddit for discussion. I was wondering if cycles will be supported in flow graphs. > 'flow' - a directed graph of processes communicating via channels Based on the definition, it doesn't seem like they're explicitly disallowed. Cycles seem potentially useful as a mechanism for implementing retries (eg. failures are optionally sent upstream in the flow if there haven't been any recent failures).
The docs on process call out the ability to send messages to specific pids for "reply-to" which seems to imply some amount of cyclicity
My follow up question is if cycles are allowed, are there other reasons for the following restriction: No key may be present in both :ins and :outs.
Thank you for this, @richhickey! I think it will prove extremely valuable for people doing โdata engineeringโ with Clojure. If Iโd had flow a few years back, I think it would have https://clojurians.slack.com/archives/C0BQDEJ8M/p1737022016172559?thread_ts=1737018018.669679&channel=C0BQDEJ8M&message_ts=1737022016.172559https://clojurians.slack.com/archives/C0BQDEJ8M/p1737022016172559?thread_ts=1737018018.669679&channel=C0BQDEJ8M&message_ts=1737022016.172559https://clojurians.slack.com/archives/C0BQDEJ8M/p1737022016172559?thread_ts=1737018018.669679&channel=C0BQDEJ8M&message_ts=1737022016.172559https://clojurians.slack.com/archives/C0BQDEJ8M/p1737022016172559?thread_ts=1737018018.669679&channel=C0BQDEJ8M&message_ts=1737022016.172559 to give it an earnest look. Clojure is severely underappreciated for โdata engineering,โ as people often use visual DAG tools or other languages like Python or Scala due to existing libraries and โeasyโ access to Java libs like Hadoop, Spark, Kafka, etc.
f/ping seems to have gone wonky with the most recent flow, I get some ping results back over the results channel, and then 1 just gets returned from f/ping, and the results from the channel and the result returned from f/ping together are not results from all the running processes
ah, it changed to collect ping results into a map and return it
and the way it does that runs afoul of the fact that I have some extra stuff responding to pings
I'll just have to rethink how I am responding to pings to align with whats expected
@hiredman Any code shared to see how others are using flow is greatly appreciated. I donโt use pings at all currently
I just fire them off at the repl to see what is going on. I have some stuff written to do sort of forking subprocesses, and I've had some problems with previous implementations deadlocking for unknown reasons, and ping was pretty useful for detecting it
I had report-chan just wired up to tap> and a tap-fn pretty printing, so it took me a bit to recognize that the reason I was getting this big chunk of non-pretty printed output, was because ping now has a return value