I’m just starting to dig into core.async for a project and trying to understand the idioms that people use. I’m coming from using AT-AT and Aleph/Manifold previously. The attraction with core.async is being able to spin off little “bots” in go loops, with “straight line code” instead of breaking everything up into callbacks. I understand the basic CSP model (I’ve read the original CSP papers and books). One question I have is about resetting, cancellation, and working with stateful core.async go blocks and threads in the REPL or editor. For instance, if I have 100 go loops in various states of execution, how do I stop them and reset the system? If I want to change the logic that is associated with a go-loop in the editor and push it into the live REPL, what is the best way to migrate all the currently-executing go-loops and threads over to this new code? With AT-AT, it’s pretty easy to just cancel every pending job and reset to a baseline. Manifold is a bit more difficult. With core.async, it’s not clear the best way to do this. Do I need to build a global “reset” channel (maybe a mult) into every go-loop and send everybody a special :reset message of some sort? Of is there a better way. Ideally, I’m not restarting my whole REPL every code change and there’s a graceful way to develop incrementally.
You would have to build the global reset. You can also check out https://clojure.github.io/core.async/flow.html which is an attempt to offer these sorts of things.
For more actor like usages it can be useful to standardize on a standard set of channels go loops get when started, like in out and signal.
@smith.adriane, I was afraid of that, but thanks. Are there standard idioms for that anywhere? Or does everybody just make it up as they go? Given the reference to flow, I’m guessing everybody used to make it up and flow is an attempt to try to provide a standard pattern and infrastructure for doing it. Do I have that about right?
That matches my understanding.
There are other "flow-like" libraries, but I'm not sure they ever got much traction. See https://clojurians.slack.com/archives/CKKPVDX53/p1707263085617019?thread_ts=1706211426.129119&cid=CKKPVDX53
@smith.adriane, thanks!
With async flow, I really like the way it standardizes my processes and helps with inspection, monitoring, and administration. However, it seems like I can't create producer processes "inside" of a flow. It seems like all producer processes are required to use external ::flow/in-ports chans. All the flow examples of producers I could find use external ::flow/in-ports as well.
More details in 🧵
I'm doing the same thing to do producers aka init threads in the init .
https://github.com/shipclojure/simulflow/blob/b3127dec199933fae00200b334ab5db68bdd6318/src/simulflow/processors/deepgram.clj#L133
With time, my thinking changed and i think it is a bit better to start your external producers in transition as opposed to init depending on the context. This is because many times it is better to start your threads on the flow/resume because init will be always called on flow/start and if they produce a lot before you call flow/resume on your graph, it can over produce.
I don't feel it is awkward but to each his own I guess
Here's another example with a timer process so you can describe "Every X time do or check for stuff" as a pure function:
https://github.com/shipclojure/simulflow/blob/b3127dec199933fae00200b334ab5db68bdd6318/src/simulflow/processors/activity_monitor.clj#L98
Another type of "producer" I did was to give to a processor a channel as param and have input data be put on that channel. And you make the processor treat that channel as nromal input with ::flow/in-ports
https://github.com/shipclojure/simulflow/blob/b3127dec199933fae00200b334ab5db68bdd6318/src/simulflow/transport/in.clj#L81
With this setup you can basically do any type of producer you want and you link it with the flow through that input channel
> With this setup you can basically do any type of producer you want and you link it with the flow through that input channel @ovidiu.stoica1094 I'm not worried that some producers aren't possible to hook into flow. What I'm worried about is that you have to work a lot harder to have producers benefit from the monitoring, inspection, error reporting, and administration that you otherwise get for free. For example, here's an example of the sort of code I would like to write:
(def dice-producer
{:describe (fn [] {:outs {:out "Outputs"}})
:init (fn [m]
;; tell flow to repeatedly call :transform after outputs have been sent
(assoc m ::flow/produce true))
:transform
(fn [state in val]
[state {:out [(inc (rand-int 6))]}])})
(def file-line-producer
{:describe (fn [] {:outs {:line "Lines of a file"}
:ins {:filename "java.io.File to read from"}
:workload :io})
:init (fn [m] m)
:transition (fn [state transition]
(when (= transition ::flow/stop)
(when-let [rdr (:reader state)]
(.close rdr))))
:transform
(fn [state in val]
(case in
:filename
(let [rdr (io/reader val)
next-state (assoc state
:reader rdr
::flow/produce true
;; don't accept another filename
;; until we're done with this one
::flow/input-filter (fn [cids]
(not= cids :filename)))]
[next-state nil])
;; produce next line
(let [rdr (:reader state)
line (.readLine rdr)]
(if line
[state {:line [line]}]
;; end of file
(let [next-state (dissoc :reader
::flow/produce
::flow/input-filter)]
(.close rdr)
[next-state nil])))))})
Not only is this less code than creating external threads and channels, but I would also get all the monitoring, inspection, administration, error reporting, etc from flow "for free".
Just looking at your examples:
• Is the process state inspectable via flow/ping?
• What happens if there is an exception? Does the exception show up in :error-chan?
• The flow config parameters of :mixed-exec/:io-exec/:compute-exec are ignored
• Decisions about thread pools and channel buffers are decided inside the logic of functions rather than in data.I suppose I understand where you are coming from, but essentially, you are giving up control for the convenience of observability.
I think that’s a great trade-off, but I’m also not convinced that your processes and other similar use cases couldn’t be written in a similar style to my example using less code while also getting more observability.
Two example producer processes that I'm trying to port to flow:
;; simple producer
(loop [state initial-state]
(let [[next-state msg] (step state)]
(when (async/>!! ch msg)
(recur next-state))))
;; dynamic producer
;; `ports` can change the producing behavior while running
(loop [state initial-state]
(let [[val port] (async/alts! ports
:default nil)
[next-state msg] (step state port val)]
(when (async/>!! ch msg)
(recur next-state))))
The step function from the dynamic producer is almost the same as the transform arity of flow processes. The only difference is that for flow processes, there's no way to get the transform function to recur after all the messages have been sent.I was able to mimic the simple producer in flow:
(def simple-producer-proc
{:describe (fn [] {:params {:initial-state ""
:step "function of [state] -> [next-state next-value]"}
:outs {:out "Outputs"}})
:init (fn [{:keys [initial-state step] :as m}]
(let [value-chan (async/chan)]
(async/thread
(loop [state initial-state]
(let [[next-state next-value] (step state)]
(when (async/>!! value-chan [next-state next-value])
(recur next-state)))))
(assoc m ::flow/in-ports {:value value-chan})))
:transform
(fn [state in [next-state val]]
[next-state {:out [val]}])})
It feels awkward to create extra channels and threads for this use case.
Similarly, it should be possible to recreate the dynamic producer as well, but I haven't tried it yet.You can almost achieve a flow managed producer by writing to an input channel for the proc, but afaict, it's not going to work since there's no way to guarantee that the internal write happens after the external write.
(def producer-proc
{:describe (fn [] {:params {:initial-state ""
:step "function of [state] -> [next-state next-value]"}
:outs {:out "Outputs"}
:ins {:recur "internal recur target"}})
:init (fn [{:keys [initial-state step] :as m}]
(assoc m :step-state initial-state))
:transform
(fn [{:keys [step-state step] :as state} in val]
(let [[next-state next-value] (step step-state)]
[(assoc state :step-state next-state)
{:out [next-value]
[(::flow/pid state) :recur] [next-value]}]))})
In this example, the write to the internal :recur channel id may happen before the write to :out which causes the producer to somewhat ignore backpressure and get ahead of the consumer.Here is another (contrived) example of using ::flow/in-ports - https://github.com/puredanger/flow-example/blob/main/src/stats.clj
Initially I find the way it works somewhat strange (especially that the channels are not availiable in the inspect of the graph).
::flow/produce is a neat idea! I'd love that as an alternative way to hook up the simple things, especially regarding readability.
The verbosity of juggling with ::flow/in-ports and the additional channels/cleanup held me back from fully commiting to flow just yet.
I guess it's an "annoyance" that hits you most when starting out, as "in the beginning, there were the producers..." ;)
I've been trying it locally and it feels good. Warning! This code relies on an implementation detail that allows you to specify the order of outputs which the docs don't explicitly allow.
(defn wrap-producer
"given a process map. Return a process that allows process to recur when state contains a true ::produce key."
[{:keys [describe init transition transform]}]
{:describe describe
:init
(fn [m]
(let [ch (async/chan (async/sliding-buffer 1))]
(-> (if init (init m) m)
(update ::flow/in-ports
assoc
::kickstart ch)
(update ::flow/out-ports
assoc ::recur ch))))
:transition
(fn [state status]
(let [state (if transition
(transition state status)
state)]
(when (and (::produce state)
(= status ::flow/resume))
(let [kickstart-ch (-> state
::flow/in-ports
::kickstart)]
(async/>!! kickstart-ch true)))
(when (= state ::flow/stop)
(let [kickstart-ch (-> state
::flow/in-ports
::kickstart)]
(async/close! kickstart-ch true)))
state))
:transform
(fn [state in msg]
(let [[state outs] (transform state in msg)
outs (if (::produce state)
(conj (into [] outs)
[::recur [true]])
outs)]
[state outs]))})
;; example usage
(def dice-producer
(wrap-producer
{:describe (fn [] {:outs {:out "Outputs"}})
:init (fn [m]
;; tell flow to repeatedly call :transform after outputs have been sent
(assoc m
::produce true
:n 0))
:transform
(fn [{:keys [n] :as state} in val]
[(update state :n inc) {:out [n]}])}))Sorry I don't have the capacity to really think about it and look at your code in detail right now, but can you pull these producers out and use https://clojurians.slack.com/archives/C05423W6H/p1760554689180219?thread_ts=1760554689.180219&cid=C05423W6H
I'm not sure what you mean by "can you pull these producers out". Anyway, the broadcast mechanism is not suitable for this use case. For my example, producers support back pressure.
Aha, sorry and disregard then. Having time to read would've helped.
That's awesome! Just looking at the wrap, I thought there might be a problem with async/pause but it's all working as it should [I need to check what resume/pause actually do @ lib level, seems I misunderstood them the whole time]. Dicing away on 1.9.829-alpha2:
EDIT: changed as per @phronmophobics suggestion below - makes much more sense this way :)
(def dice-producer
(wrap-producer
{:describe (fn [] {:outs {:out "Outputs"}})
:init (fn [m]
;; tell flow to repeatedly call :transform after outputs have been sent
(assoc m
::produce true
:n 0))
:transform
(fn [{:keys [n] :as state} in val]
[(update state :n inc) {:out [n]}])}))
(def gdef
{:procs {:dice {:proc (flow/process (flow/map->step dice-producer))}
:prn {:proc (flow/process (flow/lift1->step (fn [x] (Thread/sleep 1000) (prn x))))}}
:conns [[[:dice :out] [:prn :in]]]})
(def fl (flow/create-flow gdef))
(flow/start fl)
(flow/resume fl)
(flow/pause fl)
#_(flow/stop fl)
;; 0
;; 1
;; 2
;; 3Feels so much clearer and I'd guess it also improves the reusability/testability of the :transform function by itself.
yea, and if you change your prn process to (fn [x] (Thread/sleep 1000) (prn x)), you can see the back pressure in action.
I'll change ::produce to ::flow/produce and use that wrapper for all producer-procs in my current toy-project, hoping that the implementation detail you used becomes a feature some day. Thank you! You reignited the flow-spark in me :)