core-async

Karl Xaver 2025-04-24T12:00:58.586899Z

Is this expected behavior?

(def gdef {:procs {:printer {:proc (flow/process (flow/lift1->step println))}}})
  (def flow (flow/create-flow gdef))
  (monitoring (flow/start flow))
  (flow/resume flow)
  (flow/inject flow [:printer :in] ["testing"])
  ;; ========= monitoring start
  ;; testing
  ;; ========= message from :error-chan
  ;; #:clojure.core.async.flow{:pid :printer,
  ;;                           :status :running,
  ;;                           :state nil,
  ;;                           :count 1,
  ;;                           :cid :in,
  ;;                           :msg "testing",
  ;;                           :op :step,
  ;;                           :ex #error {
  ;;  :cause "can't resolve channel with coord"
  ;;  :data {:coord :out}
  ;;  :via
  ;;  [{:type clojure.lang.ExceptionInfo
  ;;    :message "can't resolve channel with coord"
  ;;    :data {:coord :out}
  ;;    :at [clojure.core.async.flow.impl$create_flow$reify__30145$write_chan__30192 invoke "impl.clj" 135]}]
  ;;  :trace
  ;;  [[clojure.core.async.flow.impl$create_flow$reify__30145$write_chan__30192 invoke "impl.clj" 135]
  ;;   [clojure.core.async.flow.impl$create_flow$reify$reify__30199 get_write_chan "impl.clj" 138]
  ;;   [clojure.core.async.flow.impl$send_outputs invokeStatic "impl.clj" 218]
  ;;   [clojure.core.async.flow.impl$send_outputs invoke "impl.clj" 213]
  ;;   [clojure.core.async.flow.impl$proc$reify__30258$run__30264$fn__30270 invoke "impl.clj" 297]
  ;;   [clojure.core.async.flow.impl$proc$reify__30258$run__30264 invoke "impl.clj" 275]
  ;;   [clojure.lang.AFn applyToHelper "AFn.java" 152]
  ;;   [clojure.lang.AFn applyTo "AFn.java" 144]
  ;;   [clojure.core$apply invokeStatic "core.clj" 667]
  ;;   [clojure.core$apply invoke "core.clj" 662]
  ;;   [clojure.core.async.flow.impl$futurize$fn__30090$fn__30092 invoke "impl.clj" 40]
  ;;   [clojure.lang.AFn call "AFn.java" 18]
  ;;   [java.util.concurrent.FutureTask run "FutureTask.java" 317]
  ;;   [java.util.concurrent.ThreadPoolExecutor runWorker "ThreadPoolExecutor.java" 1144]
  ;;   [java.util.concurrent.ThreadPoolExecutor$Worker run "ThreadPoolExecutor.java" 642]
  ;;   [java.lang.Thread run "Thread.java" 1583]]}}

  (flow/ping flow)
  ;; => {:printer
  ;;     #:clojure.core.async.flow{:pid :printer,
  ;;                               :status :running,
  ;;                               :state nil,
  ;;                               :count 0,
  ;;                               :ins
  ;;                               {:in
  ;;                                {:put-count 0,
  ;;                                 :take-count 1,
  ;;                                 :closed? false,
  ;;                                 :buffer
  ;;                                 {:type FixedBuffer,
  ;;                                  :count 0,
  ;;                                  :capacity 10}}},
  ;;                               :outs {:out nil}}}
   

Alex Miller (Clojure team) 2025-04-24T13:32:50.642089Z

I think the problem here is that lift expects a function that takes and returns values but println returns nil, which is not a valid channel value

Karl Xaver 2025-04-24T14:15:42.766319Z

This throws in the same way: #(do (println %) "return-value")

Alex Miller (Clojure team) 2025-04-24T13:33:52.463799Z

But I am confused by the message a bit, will take a look

Karl Xaver 2025-04-24T13:44:11.008229Z

From what I can grasp it might be happening because the processes unused :out does not get wired up to a channel, yet still exists as {:out nil}:

(flow/ping flow)
  ;; => {:printer
  ;;     #:clojure.core.async.flow{:pid :printer,
  ;;                               :status :running,
  ;;                               :state nil,
  ;;                               :count 0,
  ;;                               :ins
  ;;                               {:in
  ;;                                {:put-count 0,
  ;;                                 :take-count 1,
  ;;                                 :closed? false,
  ;;                                 :buffer
  ;;                                 {:type FixedBuffer,
  ;;                                  :count 0,
  ;;                                  :capacity 10}}},
  ;;                               :outs {:out nil}}}
So the find here gets nil from out-chans:
(defn create-flow(...)
   ... 
   write-chan #(if-let [[_ c] (or (find in-chans %) (find out-chans %))]
                                c
                                (throw (ex-info "can't resolve channel with coord" {:coord %})))