Noodling about with flow. I noticed that if I have a conn which shares a pid then it won't connect the two. Eg going from [:foo :out] [:foo :in] doesn't work.
that works for me here, can you be more specific about “doesn’t work”?
The message doesn't get run through foo in, from foo's out
again, that works here
(def gdef
{:procs
{:looper {:proc (-> #(do (prn %) (when (< % 7) (inc %))) flow/lift1->step flow/process)}}
:conns
[[[:looper :out] [:looper :in]]]})
(def g (flow/create-flow gdef))
(flow/start g)
(flow/resume g)
(flow/inject g [:looper :in] [1])
user=> 1
2
3
4
5
6
7
maybe post a gist or something
Interesting. That's quite similar to what I was trying.
the exact behavior will depend on how many output messages are produced for every input, if it is more than one it will eventually fill the buffering capacity(I think the channels are created with a buffer of size 10?) and then deadlock
yeah, well if you configure feedback you have to know what you are doing
but it is possible
a process can't read and write at the same time
Ah, it wasn't clear to me from the documentation that each process was responsible for its own connections.
I thought a central governor might do that.
a lot of this is kind of of swapped out of my brain at the moment, but yeah as I recall each process is very independent. the central thing sort of figures out the plumbing at the started and starts each processed connected to the channels it needs. I am not sure if it is still the case but there was a little indirection, when things are wired up channels might go through a per channel mult between processes, but there isn't really anything in the middle
So with this deps:
{:deps
{org.clojure/core.async
{:git/sha "830bc3b3fa293510019e0021c19c1689d6f10216"
:git/url ""}}}
and this flow:
(comment
(require '[clojure.core.async :as async]
'[clojure.core.async.flow :as flow]
'[clojure.pprint :as pp]
'[clojure.datafy :as d]
'[clojure.walk :as w])
(def y (flow/create-flow {:procs
{:foo-finder {:proc (flow/process
{:describe (constantly {:ins {:out "gimme stuff to println"
:err "gimme stuff to err"
:sleep "stop what you're doing and sleep"}
:outs {:slept "a report once slept"}})
:transform (fn [state chan v]
(if (= :sleep chan)
(do (Thread/sleep v)
[{:last-slept v} {:slept [v]}])
(.println (case chan
:out System/out
:err System/err)
(str chan ": " (pr-str v)))))})}}
:conns
[
[[:foo-finder :slept] [:foo-finder :err]]
]}))
(flow/start y)
(flow/resume y)
(flow/stop y)
(flow/inject y [:foo-finder :sleep] [1])
(flow/inject y [:foo-finder :err] ["bye?"])
(flow/inject y [:foo-finder :out] ["bye?"])
)If I inject to err, it gets immediately changed to :slept, and passed as an :in, so it ends up trying to print on the :slept case which obviously fails because of the case.
I'm guessing that's why I see the behaviour I do! Seems strange.
I've just tried this on the latest sha and it gives me an exception about PersistentHashMaps being passed too many arguments, so I'm guessing the API has changed somewhere in flow/process
edit: found it, there's now a flow/map->step. Now I've found that, I'm cooking again. The same bug is still there, though!
I understand this (the reverse lookup can’t support direct connect aliasing, used in the general 1:1 case, when it’s a feedback loop) and will fix - thanks for the repro!
fix pushed
Wonderful, thank you. I'll take a look later!