This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
- # announcements (9)
- # babashka (7)
- # beginners (190)
- # calva (5)
- # cider (12)
- # clara (1)
- # clj-kondo (26)
- # cljdoc (3)
- # cljsrn (5)
- # clojure (15)
- # clojure-australia (2)
- # clojure-czech (1)
- # clojure-europe (34)
- # clojure-germany (2)
- # clojure-nl (6)
- # clojure-spec (20)
- # clojure-uk (7)
- # clojurescript (6)
- # cloverage (1)
- # conjure (10)
- # core-async (18)
- # core-logic (1)
- # crux (2)
- # cursive (22)
- # data-science (1)
- # datalog (26)
- # datomic (35)
- # docker (1)
- # emacs (4)
- # etaoin (4)
- # fulcro (51)
- # jobs (2)
- # jobs-discuss (2)
- # joker (1)
- # leiningen (4)
- # mid-cities-meetup (1)
- # off-topic (22)
- # pathom (15)
- # re-frame (14)
- # reitit (5)
- # remote-jobs (1)
- # shadow-cljs (37)
- # specter (2)
- # tools-deps (43)
This is sorta my first time trying to use core.async in a real system so question may be silly:
I would like to:
1. Move the contents of a lazy sequence into a channel.
2. Do stuff with the channel (run it through a pipeline).
3. Take the output channel, and convert it back to a lazy sequence.
So all the core.async stuff are “black-boxed” away from the rest of the code.
The issue is: the source sequence is blocking - it makes DB calls behind the scenes, so calling
next on it could block. For this reason, It doesn’t seem right to use
to-chan which, from looking at the code, pulls from the sequence inside a
go-loop . To me it seems like I want a version of
to-chan that does the blocking in another thread, that isn’t part of the core.async thread pool (which AFAIK is small). So the question is:
1. Do I need to implement it myself, or is there some readymade function I can use for this?
2. How do I convert the output channel into a lazy-sequence (that I can consume from without waiting for the whole process to complete)?
3. Am I totally missing something here? Please elaborate 🙂
Designing around blocking lazy sequences is IMO not a good idea, I think you'll have an easier time if you just use channels or some other kind of queue.
That’s not very helpful I’m afraid - I’m using Monger which provides either a lazy sequence or iterator that I have to wrap somehow if I want to use a channel - I would like to know how to do that. On the other hand, our system doesn’t use channels and I would like to use core.async for this specific flow without rewriting the entire codebase - so I would like to know how to convert a channel into what is commonly used in Clojure - a lazy sequence. The reason I want it to be lazy is to allow processing of the output while parts of it are still being generated - otherwise response time would increase.
Well although it's not ideal it should be possible - I found a couple implementations on google, eg: https://gist.github.com/stathissideris/8659706
to-chan (which is deprecated) you can use
onto-chan!! to avoid blocking the go pool
Oh, it’s deprecated?
I don’t see that mentioned, nor the
to-chan!! function in the docs:
OK found it in the probably more official docs: https://clojure.github.io/core.async/#clojure.core.async/to-chan!! This looks like exactly what I wanted (and just implemented by myself) - will try, thanks.
Curious - if it starts as a lazy seq and ends a lazy seq, why not keep it a lazy seq? What do you gain by putting core async in the middle?
Each item in the input seq triggers another blocking call, so I’m using
pipeline-blocking to parallelize the work with constant bound on the concurrent calls.
There is one annoying issue with
to-chan!! - if the input seq throws an exception (which a DB call might) it would simply close the channel and the error gets “swallowed” - not very production-ready…
Another option for pooled parallelization could be pmap from https://github.com/TheClimateCorporation/claypoole
using an iterator instead of a lazy-seq and doing a channel op for each element is less complex than using a lazy-seq IMHO, lazy-seq is a data abstraction and not conducive to doing IO or compute tasks
Sounds like you don't need core.async at all and instead just want a primitive for limiting max number of concurrent calls. As mentioned above, claypoole might be a useful thing to look at. I use something like this: https://github.com/jumarko/clojure-experiments/blob/master/src/clojure_experiments/concurrency.clj#L62-L74
The re-chunk piece is there specifically to deal with "chunked" sequences and thus avoiding realization of whole 32 calls. In my case, the AWS api is async already, but you can easily pass
(defn re-chunk [n xs] (lazy-seq (when-let [s (seq (take n xs))] (let [cb (chunk-buffer n)] (doseq [x s] (chunk-append cb x)) (chunk-cons (chunk cb) (re-chunk n (drop n xs))))))) (defn map-throttled "Like `map` but never realizes more than `max-n` elements ahead of the consumer of the return value. Useful for cases like an rate limited asynchronous HTTP API (e.g. StartQuery AWS CloudWatch Insights API). Uses `re-chunk`." [max-n f coll] (map f (re-chunk max-n coll)))
if you care about chunking the solution isn't to "dechunk" it is to not use lazy seqs
claypoole Looks really useful, I didn’t know about it.
I ended up implementing something very similar to its
pmap that receives the thread pool size (using
pipeline-blocking inside). I may switch to it instead.