Iโve just started experimenting with c.a.flow, I like it so far! Iโm trying to connect an external process into a flow and am looking for some feedback on my code. Not sure Iโm doing it right. Code in ๐งต . Any feedback is appreciated!
Thanks, I got it working so far ๐
(defn start-external-process
"Start an external process using clojure.java.process/start
and connects its stdin, stdout and stderr to the respective
channels."
[opts args stdin-chan stdout-chan stderr-chan stop-atom]
(let [process (apply process/start opts args)
writer (.outputWriter process)
reader (.inputReader process)
error (.errorReader process)]
(a/io-thread
(loop []
(let [put (>!! stdout-chan (.readLine reader))]
(when (and put (.isAlive process) (not @stop-atom))
(recur)))))
(a/io-thread
(loop []
(let [put (>!! stderr-chan (.readLine error))]
(when (and put (.isAlive process) (not @stop-atom))
(recur)))))
(a/io-thread
(loop []
(when (and (.isAlive process) (not @stop-atom))
(let [val (<!! stdin-chan)]
(.write writer val)
(.newLine writer)
(.flush writer))
(recur))))
process))
(defn external-process
;; describe
([] {:params {:opts "Options for clojure.java.process/start"
:args "Arguments for clojure.java.process/start"}
:ins {:stdin "Write to stdin of the process"}
:outs {:stdout "Output from stdout of the process"
:stderr "Output from stderr of the process"}})
;; init
([args]
(assoc args
::flow/in-ports {:proc-stdout (chan 10)
:proc-stderr (chan 10)}
::flow/out-ports {:proc-stdin (chan 10)}
:stop (atom false)))
;; transition
([{:keys [opts args ::flow/in-ports ::flow/out-ports] :as state} transition]
(case transition
(::flow/start ::flow/resume)
(let [stop-atom (atom false)
process (start-external-process opts args (:proc-stdin out-ports) (:proc-stdout in-ports) (:proc-stderr in-ports) stop-atom)
pid (.pid process)]
(assoc state :stop stop-atom :process process :pid pid))
(::flow/stop ::flow/pause)
(do
(reset! (:stop state) true)
(.destroy (:process state))
(dissoc state :process :pid))))
;; transform
([state in msg]
(case in
:stdin ; feed a message to the process
[state {:proc-stdin [msg]}]
:proc-stdout ; receive a message from the process
[state {:stdout [msg]}]
:proc-stderr ; receive an error from the process
[state {:stderr [msg]
::flow/report [msg]}])))Iโm mostly struggling with start-external-process. Forwarding the streams to chans and vice versa seems to be working, but how do I handle pausing/stopping/starting correctly.
And is there a good way to handle the external process crashing?
LGTM! Some things I learned the hard way: โข transition fn should always return state map โข External processes/connections are better to start in a flow/resume transition as the processors are not yet active in the start state, so you risk overflowing/blocking unbuffered channels This is not the case for your code but just in general
Could someone give an example of a process that consumes :source-chan for the following flow definition? From the Flow Guide under https://clojure.github.io/core.async/flow-guide.html#flow-def:
> An example flow definition might look like this for a flow with two procs where the in-chan and out-chan are being passed through the source and sink args:
>
{:procs {:source-proc {:proc (process #'source-fn)
> :args {:source-chan in-chan}}
> :sink-proc {:proc (process #'sink-fn)
> :args {:sink-chan out-chan}}}
> :conns [ [[:source-proc :out] [:sink-proc :in]] ]}do you mean abstractly or concretely?
I mean concretely. For example how would the source process in your https://github.com/puredanger/flow-example/blob/main/src/stats.clj example look like if the random integer came from an external channel?
I have the feeling that consuming :source-chan in a go-loop or in a future is wrong and that it should be possible to access each message from :source-chan directly in the transform step function.
you will typically use flow/in-ports to return the source-chan in your initial state returned from the init method, and then the proc will only invoke the step-fn when it has an input message
so the step-fn init method will look like: ([args] {flow/in-ports {:source (:source-chan args)}})
and then the transform arity will receive a message from :source
Thanks, that works! ๐
(defn source-fn
;; describe
([] {:params {:source-chan "Source channel"}
:outs {:out "Output channel"}})
;; init
([state] (assoc state
::flow/in-ports {:source (:source-chan state)}))
;; transition
([state _transition] state)
;; transform
([state in msg] [state (when (= in :source)
{:out [msg]})]))