Fork me on GitHub
#core-async
<
2023-09-10
>
ag00:09:10

Can someone show me a simple example of recursive async pipeline, I can't wrap my head around it

ag00:09:18

for example, if I have an API where each object has "children" and I need to fetch info for every child and do it recursively, why can't I push things into the channel I'm using in the pipeline?

(def hermanos [{:name "Mark"
                :children [{:name "Ivy" :children [{:name "Immanuel"}]}
                           {:name "Maria" :children [{:name "Mitchell"}]}
                           {:name "Edward" :children []}]}
               {:name "Peter"
                :children [{:name "Brandon"}
                           {:name "Andrew"}
                           {:name "Megan"}]}
               {:name "Michael"
                :children [{:name "Bob"} {:name "Isaiah"} {:name "Irene"}]}
               {:name "Rachel" :children [{:name "Matthew"} {:name "Abigail"} {:name "Bethany"}]}
               {:name "Alexander" :children [{:name "Eli"} {:name "Amy"}]}
               {:name "Andrew" :children [{:name "Sofia"} {:name "Isla"}]}])

(let [in (a/chan)
      out (a/chan)]
  (a/pipeline-async
   1
   out
   (fn [sibling res]
     (if-let [children (:children sibling)]
       (a/go (a/>! in (first children))) ; why can't I do something like this?
       (a/go
         (a/>! res sibling)
         (a/close! res))))
   in)

  (a/go
   (doseq [sibling hermanos]
     (a/>! in sibling)))

  (a/go-loop []
    (prn (a/<! out))
    (recur)))

phronmophobic01:09:44

I think it gets stuck waiting for res to close for the first few inputs so there aren't too many async operations in flight. The following seems to work.

(def hermanos [{:name "Mark"
                :children [{:name "Ivy" :children [{:name "Immanuel"}]}
                           {:name "Maria" :children [{:name "Mitchell"}]}
                           {:name "Edward" :children []}]}
               {:name "Peter"
                :children [{:name "Brandon"}
                           {:name "Andrew"}
                           {:name "Megan"}]}
               {:name "Michael"
                :children [{:name "Bob"} {:name "Isaiah"} {:name "Irene"}]}
               {:name "Rachel" :children [{:name "Matthew"} {:name "Abigail"} {:name "Bethany"}]}
               {:name "Alexander" :children [{:name "Eli"} {:name "Amy"}]}
               {:name "Andrew" :children [{:name "Sofia"} {:name "Isla"}]}])

(let [in (a/chan)
      out (a/chan)]
  (a/pipeline-async
   1
   out
   (fn [sibling res]
     (if-let [children (:children sibling)]
       (a/go (a/put! in (first children))
             (a/close! res))
       (a/go
         (a/>! res sibling)
         (a/close! res))))
   in)


  (a/go
    (doseq [sibling hermanos]
      (a/>! in sibling)))

  (a/go-loop []
    (prn (a/<! out))
    (recur)))

hiredman01:09:20

Feeding the output of a pipeline into the input is easy to deadlock

phronmophobic01:09:40

Yea, I'm not sure this setup is a good idea.

hiredman01:09:48

And very easy when one input becomes multiple outputs

hiredman01:09:16

You might be fine just giving the channel a very large buffer

hiredman01:09:24

But you can think about it as being sort of like putting a mic next to a speaker with an amplifier between them

ag01:09:19

@U7RJTCH6J it doesn't seem to work as expected, it's now only traversing children but not root level nodes.

phronmophobic01:09:50

that’s because they have children!

phronmophobic01:09:27

elements only get added to the result chan if the don’t have children

phronmophobic01:09:40

per the function passed in

phronmophobic01:09:02

try rewriting the pipeline function to also add elements [to the results chan] when they have children

ag01:09:59

Turning it into when-let is not working either. I think I'm fundamentally doing something wrong here. That's why I'm desperately trying to find an example of a recursive async pipeline

phronmophobic01:09:11

(fn [sibling res]
     (if-let [children (:children sibling)]
       (a/go (a/put! in (first children))
             (a/close! res))
       (a/go
         (a/>! res sibling)
         (a/close! res))))
what values are being put onto the res chan?

ag01:09:25

Oy,... wait.. I think it works...

🎉 2
ag01:09:29

Thank you. You were right about closing the res channel. Now I need to rewrite this for the actual thing I'm trying to achieve. Thank you, Adrian!

clojure-spin 2
phronmophobic01:09:05

The doc string is super confusing to me tbh

phronmophobic01:09:19

I had to check the community comments to make sense of it, https://clojuredocs.org/clojure.core.async/pipeline-async

phronmophobic01:09:29

> I think the phrase af must close! the channel before returning. is confusing, and should be changed to something along the lines of af must close! the channel before the async operation completes.

phronmophobic01:09:10

the suggested update is also a bit confusing, but slightly less so, but tbf, async code is inherently hard.

ag01:09:32

Right. I wasn't even thinking about the res channel. Stupid me.

ag01:09:51

You saved me a few hours of frustration.

ag18:09:08

Hey friends, I have another pipeline related question. So, right now I have this async pipeline that realizes the children of nodes while fetching from an API (each node may have children, each child node can have its own children, etc. and we need to send get request per every child). Even though I'm using clj-http.client/get (which is non-async), I couldn't figure out how to backfill into the from channel (4th argument of core.async/pipeline) when a node has children, so it never worked with a non-async pipeline. I think I still fundamentally need to use async pipeline for this job. Now, the async pipeline does the work, but now I have this headache (I remember stumbling upon it a while ago and I still don't know how to deal with). The fundamental problem is that there's no good way of knowing when "there's no more data to fetch". So, my question is: *what is the idiomatic way of draining a channel coming out of async pipeline?*. i.e., how do I turn async into non-async/blocking call?

ag18:09:02

If my async pipeline fn returns the to channel, this works as expected, right?

(let [ch (retrieve-all-nodes)]
  (go-loop []
    (when-let [val (<! ch)]
      (println val)
      (recur))))
But if I try using <!!, it blocks indefinitely.

Bob B20:09:14

if the channel isn't closed (e.g. there's no nil put on the channel), then this will park (instead of block) indefinitely, right? And then switching from parking to blocking means indefinite blocking would make sense. If this go block goes out of scope, then it might get GC'ed, which I think would be a difference from blocking a thread.

ag20:09:47

What you're saying makes sense, but I'm failing to figure out the exact place where I need to close the channel. I just pushed a longer post here https://clojureverse.org/t/recursive-async-pipeline/10308 with a code snippet.

ag20:09:10

For now, I have resorted to using alts! with a timeout channel. It either gets everything or times out (whichever happens first). Not ideal, but works for me

ag18:09:40

Maybe I need to close channel(s) somewhere in my pipeline function? See the previous thread I've started.