Fork me on GitHub
#core-async
<
2020-05-30
>
stephenmhopper14:05:28

I have a system where I’m pulling data from an API with 5 concurrent connections. Each connection produces results to a channel using (async/onto-chan! out-channel data false) and then calls async/close! once data (the sequence of entries returned by the API is empty). These channels are all merged using async/merge. I then create a lazy-seq of results using this snippet:

(defn seq!!
  "Returns a (blocking!) lazy sequence read from a channel."
  [c]
  (lazy-seq
   (when-let [v (<!! c)]
     (cons v (seq!! c)))))
Running count on the returned lazy-seq should return about 170 records, but gives me a result of 10 (though I’ve seen 30 come back before too). Page size for results from the API is 20. I’m not explicitly setting any buffer values. Is async defaulting to a buffer size of 10 somewhere? Any ideas on why this isn’t working properly?

stephenmhopper14:05:23

I resolved the issue. The problem was that I was creating my channels for my API workers using (async/chan) and wasn’t specifying buffer size. I’m guessing that caused most of the results to just get discarded? Is that accurate?

ghadi15:05:25

A bufferless channel (async/chan) is a “rendezvous” channel, where the put into it doesn’t succeed until there is a take on the other side

stephenmhopper16:05:36

I think there’s another issue here aside from buffering. I’ve made a simplified example. Am I misusing core.async here somehow? (1) Why doesn’t this produce the same result everytime? (2) Why doesn’t this produce a correct result?

ghadi13:06:54

seq!! seems prone to misuse

Jan K16:05:05

@stephenmhopper async/onto-chan works asynchronously - you have to block on the returned channel, otherwise the c channel gets closed while onto-chan is still working

stephenmhopper16:05:22

Oh, so should I do <! on the result of onto-chan before trying to process the next iteration?

stephenmhopper16:05:31

That makes sense. Thank you!