Fork me on GitHub
#core-async
<
2020-10-05
>
Itay10:10:32

Hi, This is sorta my first time trying to use core.async in a real system so question may be silly: I would like to: 1. Move the contents of a lazy sequence into a channel. 2. Do stuff with the channel (run it through a pipeline). 3. Take the output channel, and convert it back to a lazy sequence. So all the core.async stuff are “black-boxed” away from the rest of the code. The issue is: the source sequence is blocking - it makes DB calls behind the scenes, so calling first or next on it could block. For this reason, It doesn’t seem right to use to-chan which, from looking at the code, pulls from the sequence inside a go-loop . To me it seems like I want a version of to-chan that does the blocking in another thread, that isn’t part of the core.async thread pool (which AFAIK is small). So the question is: 1. Do I need to implement it myself, or is there some readymade function I can use for this? 2. How do I convert the output channel into a lazy-sequence (that I can consume from without waiting for the whole process to complete)? 3. Am I totally missing something here? Please elaborate 🙂 Thanks!

Jan K14:10:56

Designing around blocking lazy sequences is IMO not a good idea, I think you'll have an easier time if you just use channels or some other kind of queue.

Itay15:10:09

That’s not very helpful I’m afraid - I’m using Monger which provides either a lazy sequence or iterator that I have to wrap somehow if I want to use a channel - I would like to know how to do that. On the other hand, our system doesn’t use channels and I would like to use core.async for this specific flow without rewriting the entire codebase - so I would like to know how to convert a channel into what is commonly used in Clojure - a lazy sequence. The reason I want it to be lazy is to allow processing of the output while parts of it are still being generated - otherwise response time would increase.

Jan K16:10:14

Well although it's not ideal it should be possible - I found a couple implementations on google, eg: https://gist.github.com/stathissideris/8659706

Jan K16:10:08

Instead of to-chan (which is deprecated) you can use clojure.core.async/to-chan!! or onto-chan!! to avoid blocking the go pool

Itay16:10:09

Thanks! A friend sent me the same link and it seems to fit the output part.

Itay16:10:32

Oh, it’s deprecated? I don’t see that mentioned, nor the to-chan!! function in the docs: https://clojuredocs.org/clojure.core.async

Itay16:10:56

OK found it in the probably more official docs: https://clojure.github.io/core.async/#clojure.core.async/to-chan!! This looks like exactly what I wanted (and just implemented by myself) - will try, thanks.

Jan K16:10:48

Yeah those docs on http://clojuredocs.org seem out of date.

orestis17:10:01

Curious - if it starts as a lazy seq and ends a lazy seq, why not keep it a lazy seq? What do you gain by putting core async in the middle?

Itay18:10:20

Each item in the input seq triggers another blocking call, so I’m using pipeline-blocking to parallelize the work with constant bound on the concurrent calls.

Itay18:10:07

There is one annoying issue with to-chan!! - if the input seq throws an exception (which a DB call might) it would simply close the channel and the error gets “swallowed” - not very production-ready…

Jan K22:10:53

Another option for pooled parallelization could be pmap from https://github.com/TheClimateCorporation/claypoole

noisesmith01:10:52

using an iterator instead of a lazy-seq and doing a channel op for each element is less complex than using a lazy-seq IMHO, lazy-seq is a data abstraction and not conducive to doing IO or compute tasks

jumar06:10:47

Sounds like you don't need core.async at all and instead just want a primitive for limiting max number of concurrent calls. As mentioned above, claypoole might be a useful thing to look at. I use something like this: https://github.com/jumarko/clojure-experiments/blob/master/src/clojure_experiments/concurrency.clj#L62-L74

(defn re-chunk [n xs]
  (lazy-seq
   (when-let [s (seq (take n xs))]
     (let [cb (chunk-buffer n)]
       (doseq [x s] (chunk-append cb x))
       (chunk-cons (chunk cb) (re-chunk n (drop n xs)))))))

(defn map-throttled
  "Like `map` but never realizes more than `max-n` elements ahead of the consumer of the return value.
  Useful for cases like an rate limited asynchronous HTTP API (e.g. StartQuery AWS CloudWatch Insights API).
  Uses `re-chunk`."
  [max-n f coll]
  (map f (re-chunk max-n coll)))
The re-chunk piece is there specifically to deal with "chunked" sequences and thus avoiding realization of whole 32 calls. In my case, the AWS api is async already, but you can easily pass future as f.

noisesmith15:10:16

if you care about chunking the solution isn't to "dechunk" it is to not use lazy seqs

Itay15:10:56

claypoole Looks really useful, I didn’t know about it. I ended up implementing something very similar to its pmap that receives the thread pool size (using pipeline-blocking inside). I may switch to it instead.

jumar12:10:03

@U051SS2EU I know but it's quite handy in this case. I didn't find an easier way to do what I wanted.