core-async

ag 2023-09-10T00:27:10.135879Z

Can someone show me a simple example of recursive async pipeline, I can't wrap my head around it

ag 2023-09-10T00:37:18.516409Z

for example, if I have an API where each object has "children" and I need to fetch info for every child and do it recursively, why can't I push things into the channel I'm using in the pipeline?

(def hermanos [{:name "Mark"
                :children [{:name "Ivy" :children [{:name "Immanuel"}]}
                           {:name "Maria" :children [{:name "Mitchell"}]}
                           {:name "Edward" :children []}]}
               {:name "Peter"
                :children [{:name "Brandon"}
                           {:name "Andrew"}
                           {:name "Megan"}]}
               {:name "Michael"
                :children [{:name "Bob"} {:name "Isaiah"} {:name "Irene"}]}
               {:name "Rachel" :children [{:name "Matthew"} {:name "Abigail"} {:name "Bethany"}]}
               {:name "Alexander" :children [{:name "Eli"} {:name "Amy"}]}
               {:name "Andrew" :children [{:name "Sofia"} {:name "Isla"}]}])

(let [in (a/chan)
      out (a/chan)]
  (a/pipeline-async
   1
   out
   (fn [sibling res]
     (if-let [children (:children sibling)]
       (a/go (a/>! in (first children))) ; why can't I do something like this?
       (a/go
         (a/>! res sibling)
         (a/close! res))))
   in)

  (a/go
   (doseq [sibling hermanos]
     (a/>! in sibling)))

  (a/go-loop []
    (prn (a/<! out))
    (recur)))

phronmophobic 2023-09-10T01:05:44.357429Z

I think it gets stuck waiting for res to close for the first few inputs so there aren't too many async operations in flight. The following seems to work.

(def hermanos [{:name "Mark"
                :children [{:name "Ivy" :children [{:name "Immanuel"}]}
                           {:name "Maria" :children [{:name "Mitchell"}]}
                           {:name "Edward" :children []}]}
               {:name "Peter"
                :children [{:name "Brandon"}
                           {:name "Andrew"}
                           {:name "Megan"}]}
               {:name "Michael"
                :children [{:name "Bob"} {:name "Isaiah"} {:name "Irene"}]}
               {:name "Rachel" :children [{:name "Matthew"} {:name "Abigail"} {:name "Bethany"}]}
               {:name "Alexander" :children [{:name "Eli"} {:name "Amy"}]}
               {:name "Andrew" :children [{:name "Sofia"} {:name "Isla"}]}])

(let [in (a/chan)
      out (a/chan)]
  (a/pipeline-async
   1
   out
   (fn [sibling res]
     (if-let [children (:children sibling)]
       (a/go (a/put! in (first children))
             (a/close! res))
       (a/go
         (a/>! res sibling)
         (a/close! res))))
   in)


  (a/go
    (doseq [sibling hermanos]
      (a/>! in sibling)))

  (a/go-loop []
    (prn (a/<! out))
    (recur)))

2023-09-10T01:07:20.831979Z

Feeding the output of a pipeline into the input is easy to deadlock

phronmophobic 2023-09-10T01:07:40.329629Z

Yea, I'm not sure this setup is a good idea.

2023-09-10T01:07:48.318649Z

And very easy when one input becomes multiple outputs

2023-09-10T01:09:16.751499Z

You might be fine just giving the channel a very large buffer

2023-09-10T01:10:24.502839Z

But you can think about it as being sort of like putting a mic next to a speaker with an amplifier between them

ag 2023-09-10T01:21:19.987829Z

@smith.adriane it doesn't seem to work as expected, it's now only traversing children but not root level nodes.

phronmophobic 2023-09-10T01:21:50.852459Z

that’s because they have children!

phronmophobic 2023-09-10T01:26:27.297459Z

elements only get added to the result chan if the don’t have children

phronmophobic 2023-09-10T01:26:40.300099Z

per the function passed in

phronmophobic 2023-09-10T01:27:02.119099Z

try rewriting the pipeline function to also add elements [to the results chan] when they have children

ag 2023-09-10T01:28:59.727939Z

Turning it into when-let is not working either. I think I'm fundamentally doing something wrong here. That's why I'm desperately trying to find an example of a recursive async pipeline

phronmophobic 2023-09-10T01:30:11.083789Z

(fn [sibling res]
     (if-let [children (:children sibling)]
       (a/go (a/put! in (first children))
             (a/close! res))
       (a/go
         (a/>! res sibling)
         (a/close! res))))
what values are being put onto the res chan?

ag 2023-09-10T01:30:25.306979Z

Oy,... wait.. I think it works...

🎉 1
ag 2023-09-10T01:31:29.500099Z

Thank you. You were right about closing the res channel. Now I need to rewrite this for the actual thing I'm trying to achieve. Thank you, Adrian!

1
phronmophobic 2023-09-10T01:32:05.782259Z

The doc string is super confusing to me tbh

phronmophobic 2023-09-10T01:32:19.863629Z

I had to check the community comments to make sense of it, https://clojuredocs.org/clojure.core.async/pipeline-async

phronmophobic 2023-09-10T01:32:29.505579Z

> I think the phrase af must close! the channel before returning. is confusing, and should be changed to something along the lines of af must close! the channel before the async operation completes.

phronmophobic 2023-09-10T01:33:10.473929Z

the suggested update is also a bit confusing, but slightly less so, but tbf, async code is inherently hard.

ag 2023-09-10T01:33:32.888989Z

Right. I wasn't even thinking about the res channel. Stupid me.

ag 2023-09-10T01:33:51.703959Z

You saved me a few hours of frustration.

ag 2023-09-10T18:28:08.422639Z

Hey friends, I have another pipeline related question. So, right now I have this async pipeline that realizes the children of nodes while fetching from an API (each node may have children, each child node can have its own children, etc. and we need to send get request per every child). Even though I'm using clj-http.client/get (which is non-async), I couldn't figure out how to backfill into the from channel (4th argument of core.async/pipeline) when a node has children, so it never worked with a non-async pipeline. I think I still fundamentally need to use async pipeline for this job. Now, the async pipeline does the work, but now I have this headache (I remember stumbling upon it a while ago and I still don't know how to deal with). The fundamental problem is that there's no good way of knowing when "there's no more data to fetch". So, my question is: *what is the idiomatic way of draining a channel coming out of async pipeline?*. i.e., how do I turn async into non-async/blocking call?

ag 2023-09-10T18:31:02.252899Z

If my async pipeline fn returns the to channel, this works as expected, right?

(let [ch (retrieve-all-nodes)]
  (go-loop []
    (when-let [val (<! ch)]
      (println val)
      (recur))))
But if I try using <!!, it blocks indefinitely.

Bob B 2023-09-10T20:27:14.446169Z

if the channel isn't closed (e.g. there's no nil put on the channel), then this will park (instead of block) indefinitely, right? And then switching from parking to blocking means indefinite blocking would make sense. If this go block goes out of scope, then it might get GC'ed, which I think would be a difference from blocking a thread.

ag 2023-09-10T20:29:47.106439Z

What you're saying makes sense, but I'm failing to figure out the exact place where I need to close the channel. I just pushed a longer post here https://clojureverse.org/t/recursive-async-pipeline/10308 with a code snippet.

ag 2023-09-10T20:31:10.874059Z

For now, I have resorted to using alts! with a timeout channel. It either gets everything or times out (whichever happens first). Not ideal, but works for me

ag 2023-09-10T18:37:40.324269Z

Maybe I need to close channel(s) somewhere in my pipeline function? See the previous thread I've started.