Fork me on GitHub
#core-async
<
2020-11-05
>
holyjak15:11:24

Hello! I am trying to make a fn that reads repeatedly a "page" of 20 posts from a paginated API as long as there are any more pages OR until the destination channel is closed (ie. the reader lost interest). I struggle with a good way to do the second part as the only way to see whether a channel is closed is writing to it. What I have now is this:

(defn async-fetch-all-posts [dst] ; dst is a channel
  (a/thread
    (loop [href "/api/posts?offset=0"]
      (try
        (let [{:keys [posts next-href]} (parse-body (fetch-posts-raw href))
              [post more-posts]         posts
              dst-open?                 (a/>!! dst post)]
          (a/onto-chan! dst more-posts false)
          (if (and dst-open? next-href)
            (recur next-href)
            :success))
        (catch Exception e e)
        (finally (a/close! dst))))))
Ideally I would do only (a/onto-chan! dst posts false) but it doesn't tell me whether the dst was opened or not. So I either have to do it that ☝️ way (or is there a nicer one?) or am thinking about making my own variant of onto-chan that would f.ex. return a channel with a single value true/false if it did/didn't copy the items. What do you think? There must be a nicer way?

👀 3
dpsutton15:11:00

use the backpressure from the onto-chan!. if the consumer keeps pulling from the destination then its this loops responsibility to keep filling it up. if you're never able to put all of the posts onto that channel then you don't need to keep working

holyjak18:11:09

Thank you, that is an important point that I missed!

dpsutton15:11:49

also feels like a smell to me that there are two blocks who can independently close the channel.

holyjak18:11:06

I think it is a valid use case, that I want the channel closed if there is no more input but also if the (single) consumer is not interested in any more content? I could solve the latter by passing around another "stop signal", maybe it is a misuse to use the channel for both..

hiredman16:11:28

Using onto-chan like that is kind of a waste, you could just put a cat transducer on dst

holyjak18:11:35

dst is a channel I get from the outside and am supposed to put inidividual posts onto. Are you suggesting to wrap it with a channel with the cat xform, i.e. something like

[dst]
(let [dst' (chan 1 cat)]
   (a/pipe dst' dst)
   ...
   (>!! dst' posts)
   ...)
? But I believe if dst is closed, dst' will stay open (and the code will block forever trying to write to it when there is no reader)

hiredman18:11:23

This is where if I had an animation of a guy climbing on to a hobby horse I would insert it

hiredman18:11:46

the structure of this process is an iteration

hiredman18:11:52

like if you where construction a lazy seq with a pure function it would be something like (iterate f init)

holyjak18:11:26

That is correct.

hiredman18:11:39

where init is some value containing all the posts, and f iterates slicing off posts

holyjak18:11:35

I guess this leads to a deep insight on how to restructure and improve the code?

hiredman18:11:55

so I would write a channel version of iterate

hiredman18:11:09

yeah, the idea is you can decouple the looping logic from the rest of it

holyjak18:11:55

that sounds good ❤️ Let me think about it... Though I still need to solve the problem that I have batches of posts while I want individual posts.

hiredman18:11:08

I was discussing this with someone else and ended up

(defn async-iteration [from-process
                       to-process
                       output-channel
                       & {:keys [values-selector some? init]
                          :or {values-selector vector
                               some? some?}}]
  (async/>! to-process init)
  (loop [value (async/<! from-process)]
    (if (some? value)
      (do
        (doseq [item (values-selector value)]
          (async/>! output-channel item))
        (async/>! to-process value))
      (async/close! output-channel))))
but I don't recall if that was the final version or if there was something I didn't like about

👀 3
hiredman18:11:37

another thing to think about is, iteration is a way to turn a function into a process, in this case that would mean taking a function and making into a go async thing that consumes from a channel and outputs to a channel, but what if you already have a process, and want to iterate it

holyjak19:11:15

I do not understand how the function is supposed to work 😢 / how to fit it on my problem. So it consumes from the from-process channel, which in my case produces batches of posts, than splits those into individual items and puts onto the output channel. What is the purpose of the to-process ?

hiredman19:11:41

that is where your function breaks with iteration

👍 3
hiredman19:11:52

your function entirely makes the decision to continue iterating or not based on if it publishes results to the channel, not based on what it previously generated

hiredman19:11:40

an iteration is a feedback loop, where the previous output of the process is fed back into the next step of the process

hiredman19:11:41

so I assume the consumer of dst is reading from dst and when it gets some output from it that signals there are no more posts to fetch, it closes dst, which signals your function to stop fetching

hiredman19:11:54

ah, I misread your function, it doesn't break with iteration

hiredman19:11:18

the to-process in my async-interation is equivalent to the recur with next-href in your function

👍 3
Jan K17:11:23

I think I'd just use something like this instead of onto-chan: (reduce #(or (a/>!! ch %2) (reduced :ch-is-closed)) nil posts)

holyjak18:11:35

dst is a channel I get from the outside and am supposed to put inidividual posts onto. Are you suggesting to wrap it with a channel with the cat xform, i.e. something like

[dst]
(let [dst' (chan 1 cat)]
   (a/pipe dst' dst)
   ...
   (>!! dst' posts)
   ...)
? But I believe if dst is closed, dst' will stay open (and the code will block forever trying to write to it when there is no reader)

hiredman18:11:23

This is where if I had an animation of a guy climbing on to a hobby horse I would insert it