This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
Can someone show me a simple example of recursive async pipeline, I can't wrap my head around it
for example, if I have an API where each object has "children" and I need to fetch info for every child and do it recursively, why can't I push things into the channel I'm using in the pipeline?
(def hermanos [{:name "Mark"
:children [{:name "Ivy" :children [{:name "Immanuel"}]}
{:name "Maria" :children [{:name "Mitchell"}]}
{:name "Edward" :children []}]}
{:name "Peter"
:children [{:name "Brandon"}
{:name "Andrew"}
{:name "Megan"}]}
{:name "Michael"
:children [{:name "Bob"} {:name "Isaiah"} {:name "Irene"}]}
{:name "Rachel" :children [{:name "Matthew"} {:name "Abigail"} {:name "Bethany"}]}
{:name "Alexander" :children [{:name "Eli"} {:name "Amy"}]}
{:name "Andrew" :children [{:name "Sofia"} {:name "Isla"}]}])
(let [in (a/chan)
out (a/chan)]
(a/pipeline-async
1
out
(fn [sibling res]
(if-let [children (:children sibling)]
(a/go (a/>! in (first children))) ; why can't I do something like this?
(a/go
(a/>! res sibling)
(a/close! res))))
in)
(a/go
(doseq [sibling hermanos]
(a/>! in sibling)))
(a/go-loop []
(prn (a/<! out))
(recur)))
I think it gets stuck waiting for res
to close for the first few inputs so there aren't too many async operations in flight. The following seems to work.
(def hermanos [{:name "Mark"
:children [{:name "Ivy" :children [{:name "Immanuel"}]}
{:name "Maria" :children [{:name "Mitchell"}]}
{:name "Edward" :children []}]}
{:name "Peter"
:children [{:name "Brandon"}
{:name "Andrew"}
{:name "Megan"}]}
{:name "Michael"
:children [{:name "Bob"} {:name "Isaiah"} {:name "Irene"}]}
{:name "Rachel" :children [{:name "Matthew"} {:name "Abigail"} {:name "Bethany"}]}
{:name "Alexander" :children [{:name "Eli"} {:name "Amy"}]}
{:name "Andrew" :children [{:name "Sofia"} {:name "Isla"}]}])
(let [in (a/chan)
out (a/chan)]
(a/pipeline-async
1
out
(fn [sibling res]
(if-let [children (:children sibling)]
(a/go (a/put! in (first children))
(a/close! res))
(a/go
(a/>! res sibling)
(a/close! res))))
in)
(a/go
(doseq [sibling hermanos]
(a/>! in sibling)))
(a/go-loop []
(prn (a/<! out))
(recur)))
Yea, I'm not sure this setup is a good idea.
But you can think about it as being sort of like putting a mic next to a speaker with an amplifier between them
@U7RJTCH6J it doesn't seem to work as expected, it's now only traversing children but not root level nodes.
that’s because they have children!
elements only get added to the result chan if the don’t have children
per the function passed in
try rewriting the pipeline function to also add elements [to the results chan] when they have children
Turning it into when-let is not working either. I think I'm fundamentally doing something wrong here. That's why I'm desperately trying to find an example of a recursive async pipeline
(fn [sibling res]
(if-let [children (:children sibling)]
(a/go (a/put! in (first children))
(a/close! res))
(a/go
(a/>! res sibling)
(a/close! res))))
what values are being put onto the res
chan?Thank you. You were right about closing the res
channel. Now I need to rewrite this for the actual thing I'm trying to achieve. Thank you, Adrian!

The doc string is super confusing to me tbh
I had to check the community comments to make sense of it, https://clojuredocs.org/clojure.core.async/pipeline-async
> I think the phrase af must close! the channel before returning.
is confusing, and should be changed to something along the lines of af must close! the channel before the async operation completes
.
the suggested update is also a bit confusing, but slightly less so, but tbf, async code is inherently hard.
Hey friends, I have another pipeline related question. So, right now I have this async pipeline that realizes the children of nodes while fetching from an API (each node may have children, each child node can have its own children, etc. and we need to send get request per every child).
Even though I'm using clj-http.client/get (which is non-async), I couldn't figure out how to backfill into the from
channel (4th argument of core.async/pipeline) when a node has children, so it never worked with a non-async pipeline. I think I still fundamentally need to use async pipeline for this job.
Now, the async pipeline does the work, but now I have this headache (I remember stumbling upon it a while ago and I still don't know how to deal with). The fundamental problem is that there's no good way of knowing when "there's no more data to fetch". So, my question is: *what is the idiomatic way of draining a channel coming out of async pipeline?*. i.e., how do I turn async into non-async/blocking call?
If my async pipeline fn returns the to
channel, this works as expected, right?
(let [ch (retrieve-all-nodes)]
(go-loop []
(when-let [val (<! ch)]
(println val)
(recur))))
But if I try using <!!
, it blocks indefinitely.if the channel isn't closed (e.g. there's no nil
put on the channel), then this will park (instead of block) indefinitely, right? And then switching from parking to blocking means indefinite blocking would make sense.
If this go block goes out of scope, then it might get GC'ed, which I think would be a difference from blocking a thread.
What you're saying makes sense, but I'm failing to figure out the exact place where I need to close the channel. I just pushed a longer post here https://clojureverse.org/t/recursive-async-pipeline/10308 with a code snippet.