core-async

jlmr 2025-07-28T09:19:08.794869Z

Iโ€™ve just started experimenting with c.a.flow, I like it so far! Iโ€™m trying to connect an external process into a flow and am looking for some feedback on my code. Not sure Iโ€™m doing it right. Code in ๐Ÿงต . Any feedback is appreciated!

jlmr 2025-07-30T15:14:56.037289Z

Thanks, I got it working so far ๐Ÿ™‚

jlmr 2025-07-28T09:19:14.843749Z

(defn start-external-process
  "Start an external process using clojure.java.process/start
   and connects its stdin, stdout and stderr to the respective
   channels."
  [opts args stdin-chan stdout-chan stderr-chan stop-atom]
  (let [process (apply process/start opts args)
        writer (.outputWriter process)
        reader (.inputReader process)
        error (.errorReader process)]
    (a/io-thread
     (loop []
       (let [put (>!! stdout-chan (.readLine reader))]
         (when (and put (.isAlive process) (not @stop-atom))
           (recur)))))
    (a/io-thread
     (loop []
       (let [put (>!! stderr-chan (.readLine error))]
         (when (and put (.isAlive process) (not @stop-atom))
           (recur)))))
    (a/io-thread
     (loop []
       (when (and (.isAlive process) (not @stop-atom))
         (let [val (<!! stdin-chan)]
           (.write writer val)
           (.newLine writer)
           (.flush writer))
         (recur))))
    process))

(defn external-process
  ;; describe
  ([] {:params {:opts "Options for clojure.java.process/start"
                :args "Arguments for clojure.java.process/start"}
       :ins {:stdin "Write to stdin of the process"}
       :outs {:stdout "Output from stdout of the process"
              :stderr "Output from stderr of the process"}})

  ;; init
  ([args] 
   (assoc args
          ::flow/in-ports {:proc-stdout (chan 10)
                           :proc-stderr (chan 10)}
          ::flow/out-ports {:proc-stdin (chan 10)}
          :stop (atom false)))

  ;; transition
  ([{:keys [opts args ::flow/in-ports ::flow/out-ports] :as state} transition]
   (case transition
     (::flow/start ::flow/resume)
     (let [stop-atom (atom false)
           process (start-external-process opts args (:proc-stdin out-ports) (:proc-stdout in-ports) (:proc-stderr in-ports) stop-atom)
           pid (.pid process)]
       (assoc state :stop stop-atom :process process :pid pid))

     (::flow/stop ::flow/pause)
     (do
       (reset! (:stop state) true)
       (.destroy (:process state))
       (dissoc state :process :pid))))

  ;; transform
  ([state in msg] 
   (case in
     :stdin ; feed a message to the process
     [state {:proc-stdin [msg]}]

     :proc-stdout ; receive a message from the process
     [state {:stdout [msg]}]

     :proc-stderr  ; receive an error from the process
     [state {:stderr [msg]
             ::flow/report [msg]}])))

jlmr 2025-07-28T09:35:42.536229Z

Iโ€™m mostly struggling with start-external-process. Forwarding the streams to chans and vice versa seems to be working, but how do I handle pausing/stopping/starting correctly. And is there a good way to handle the external process crashing?

Ovi Stoica 2025-07-28T09:37:05.436059Z

LGTM! Some things I learned the hard way: โ€ข transition fn should always return state map โ€ข External processes/connections are better to start in a flow/resume transition as the processors are not yet active in the start state, so you risk overflowing/blocking unbuffered channels This is not the case for your code but just in general

2025-07-28T19:18:33.355129Z

Could someone give an example of a process that consumes :source-chan for the following flow definition? From the Flow Guide under https://clojure.github.io/core.async/flow-guide.html#flow-def: > An example flow definition might look like this for a flow with two procs where the in-chan and out-chan are being passed through the source and sink args: >

{:procs {:source-proc {:proc (process #'source-fn)
>                        :args {:source-chan in-chan}}
>          :sink-proc   {:proc (process #'sink-fn)
>                        :args {:sink-chan out-chan}}}
>  :conns [ [[:source-proc :out] [:sink-proc :in]] ]}

Alex Miller (Clojure team) 2025-07-28T19:38:43.590149Z

do you mean abstractly or concretely?

2025-07-28T19:43:09.616299Z

I mean concretely. For example how would the source process in your https://github.com/puredanger/flow-example/blob/main/src/stats.clj example look like if the random integer came from an external channel? I have the feeling that consuming :source-chan in a go-loop or in a future is wrong and that it should be possible to access each message from :source-chan directly in the transform step function.

Alex Miller (Clojure team) 2025-07-28T19:45:04.987969Z

you will typically use flow/in-ports to return the source-chan in your initial state returned from the init method, and then the proc will only invoke the step-fn when it has an input message

๐Ÿ™ 1
Alex Miller (Clojure team) 2025-07-28T19:46:30.772569Z

so the step-fn init method will look like: ([args] {flow/in-ports {:source (:source-chan args)}})

Alex Miller (Clojure team) 2025-07-28T19:47:22.962719Z

and then the transform arity will receive a message from :source

2025-07-28T19:53:18.675289Z

Thanks, that works! ๐Ÿ™‚

(defn source-fn
  ;; describe
  ([] {:params {:source-chan "Source channel"}
       :outs {:out "Output channel"}})

  ;; init
  ([state] (assoc state
                  ::flow/in-ports {:source (:source-chan state)}))

  ;; transition
  ([state _transition] state)

  ;; transform
  ([state in msg] [state (when (= in :source)
                           {:out [msg]})]))

๐Ÿ‘ 2