core-async

dominicm 2025-03-20T18:33:07.307929Z

Noodling about with flow. I noticed that if I have a conn which shares a pid then it won't connect the two. Eg going from [:foo :out] [:foo :in] doesn't work.

2025-04-08T17:40:17.886289Z

that works for me here, can you be more specific about “doesn’t work”?

dominicm 2025-04-08T17:40:53.438299Z

The message doesn't get run through foo in, from foo's out

2025-04-08T17:48:23.513079Z

again, that works here

(def gdef
  {:procs
   {:looper {:proc (-> #(do (prn %) (when (< % 7) (inc %))) flow/lift1->step flow/process)}}
   :conns
   [[[:looper :out] [:looper :in]]]})
(def g (flow/create-flow gdef))
(flow/start g)
(flow/resume g)
(flow/inject g [:looper :in] [1])
user=> 1
2
3
4
5
6
7

2025-04-08T17:54:47.196719Z

maybe post a gist or something

dominicm 2025-04-08T18:01:52.091539Z

Interesting. That's quite similar to what I was trying.

2025-04-08T18:05:39.526639Z

the exact behavior will depend on how many output messages are produced for every input, if it is more than one it will eventually fill the buffering capacity(I think the channels are created with a buffer of size 10?) and then deadlock

2025-04-08T18:12:20.764249Z

yeah, well if you configure feedback you have to know what you are doing

2025-04-08T18:12:29.567619Z

but it is possible

2025-03-20T18:40:52.268239Z

a process can't read and write at the same time

dominicm 2025-03-20T18:41:35.621679Z

Ah, it wasn't clear to me from the documentation that each process was responsible for its own connections.

dominicm 2025-03-20T18:41:47.208529Z

I thought a central governor might do that.

2025-03-20T18:47:11.226149Z

a lot of this is kind of of swapped out of my brain at the moment, but yeah as I recall each process is very independent. the central thing sort of figures out the plumbing at the started and starts each processed connected to the channels it needs. I am not sure if it is still the case but there was a little indirection, when things are wired up channels might go through a per channel mult between processes, but there isn't really anything in the middle

👍 1
dominicm 2025-04-09T09:36:43.544859Z

So with this deps:

{:deps
 {org.clojure/core.async
  {:git/sha "830bc3b3fa293510019e0021c19c1689d6f10216"
   :git/url ""}}}
and this flow:
(comment
  (require '[clojure.core.async :as async]
           '[clojure.core.async.flow :as flow]
           '[clojure.pprint :as pp]
           '[clojure.datafy :as d]
           '[clojure.walk :as w])
  (def y (flow/create-flow {:procs
                            {:foo-finder {:proc (flow/process
                                                  {:describe (constantly {:ins {:out "gimme stuff to println"
                                                                                :err "gimme stuff to err"
                                                                                :sleep "stop what you're doing and sleep"}
                                                                          :outs {:slept "a report once slept"}})
                                                   :transform (fn [state chan v]
                                                                (if (= :sleep chan)
                                                                  (do (Thread/sleep v)
                                                                      [{:last-slept v} {:slept [v]}])
                                                                  (.println (case chan
                                                                              :out System/out
                                                                              :err System/err)
                                                                            (str chan ":  " (pr-str v)))))})}}
                            :conns
                            [
                             [[:foo-finder :slept] [:foo-finder :err]]
                             ]}))

  (flow/start y)
  (flow/resume y)
  (flow/stop y)
  (flow/inject y [:foo-finder :sleep] [1])
  (flow/inject y [:foo-finder :err] ["bye?"])
  (flow/inject y [:foo-finder :out] ["bye?"])
  )

dominicm 2025-04-09T09:38:01.785929Z

If I inject to err, it gets immediately changed to :slept, and passed as an :in, so it ends up trying to print on the :slept case which obviously fails because of the case. I'm guessing that's why I see the behaviour I do! Seems strange.

dominicm 2025-04-09T09:44:03.678939Z

I've just tried this on the latest sha and it gives me an exception about PersistentHashMaps being passed too many arguments, so I'm guessing the API has changed somewhere in flow/process edit: found it, there's now a flow/map->step. Now I've found that, I'm cooking again. The same bug is still there, though!

2025-04-09T12:49:58.656159Z

I understand this (the reverse lookup can’t support direct connect aliasing, used in the general 1:1 case, when it’s a feedback loop) and will fix - thanks for the repro!

2025-04-09T12:59:54.765139Z

fix pushed

dominicm 2025-04-09T13:11:57.887549Z

Wonderful, thank you. I'll take a look later!