This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
- # announcements (33)
- # babashka (13)
- # babashka-sci-dev (23)
- # beginners (94)
- # calva (105)
- # cider (37)
- # circleci (1)
- # clj-kondo (45)
- # cljs-dev (59)
- # cljsrn (2)
- # clojure (145)
- # clojure-czech (1)
- # clojure-europe (19)
- # clojure-nl (18)
- # clojure-norway (13)
- # clojure-portugal (1)
- # clojure-uk (5)
- # clojurescript (19)
- # community-development (2)
- # conjure (3)
- # copenhagen-clojurians (1)
- # core-async (34)
- # data-science (6)
- # datomic (25)
- # emacs (43)
- # fulcro (19)
- # graalvm (7)
- # graalvm-mobile (12)
- # graphql (10)
- # honeysql (3)
- # hyperfiddle (3)
- # improve-getting-started (2)
- # interop (20)
- # kaocha (3)
- # lsp (16)
- # meander (7)
- # off-topic (22)
- # other-languages (14)
- # portal (15)
- # releases (1)
- # rewrite-clj (2)
- # ring (1)
- # shadow-cljs (119)
- # spacemacs (19)
- # sql (65)
- # testing (4)
- # tools-deps (11)
- # xtdb (29)
I'm trying to figure out how core async works (pipeline-async ) but I am struggling to understand some things:
I have a redis server where I push github search urls.
I have a
go-loop that reads the urls and pushes them to an
I have a
go-loop that reads from
out channel and prints to console .
pipeline-async to read urls from
in , fetches the seerach results from github and pushes them to out, one by one.
The issue with the above code is that it works only once and I can't make it "listen" to redis and process messages continuously.
the code is inspired from this gist and I posted my solution there https://gist.github.com/JacobNinja/5c98496a632e1a466cbf .
Any ideas what am I doing wrong ?
You are trying to use
clj-http. To do that, use async HTTP calls and supply a callback method that will
async/put! the result on the
out channel provided by
I am not sure I understand why I need to
out and close
I believe closing
out* is used for signaling.
cose = work has finished for the url ?!
Well first because this is the contract that
pipeline-async specifies for the
af async function:
And you are right:
af > must be a function of two arguments, the first an input value and > the second a channel on which to place the result(s). af must close! > the channel before returning.
afcan produce zero or more results, and must
close!to signal all results are ready
I updated my solution to work properly regarding
out channel .
I was not putting to the channel received from pipeline async.
I think I am getting the hang of this - could not have done it without your help. I will try to migrate the sample to use pipeline-blocking.
Don’t do that. Just run
(defn request-handler [url out*] (async/go
client/getdirectly, not in a
Also, something you should never forget to do in core.async (I learned it the hard way): You need to ensure that your application will wait for async processes to finish.
In your case, the
pipeline-blocking will return a channel that will close when done. If you call this from
-main without blocking the main thread on this channel, your application will immediately end without waiting for the async processes to finish.
for pipeline-blocking I have this code.
put! a sequence to the channel instead of individual values.
I guess the semantics are changed in this case and I will have to update my code to take a sequence from the
Just asking: is there a way to put! individual values in the case of pipeline-blocking ?!
Don’t run a
(async/go (async/>! out* (get repo "clone_url"))
pipeline-blocking case: Nope, it will put the return value on the out channel. But you can use a transducer with your out channel:
(let [out (async/chan 4 cat)]
cattransducer will take sequences and concatenate them, so the
outchannel will not expect sequences, but readers get the individual values.
You are doing blocking io reading from redis in a go, which is blocking the threadpool that go's run on
You are not supposed to do blocking operations in go because it will starve other go blocks
I don't think that is the case.
This code prints "Sleeping" every 1s since
nil if there is no value
(async/go-loop  (if-let [message (wcar* (car/lpop "queue"))] (do (println "Got message from queue" message) (async/>! in message)) ; Sleep if no messages available (do (println "sleeping") (async/<! (async/timeout 1000)))) (recur))
If it is the cause of your issue or not I don't know, but it is absolutely the case that you should not do it, and it will cause issues if you do
thanks, I will take a look at this as well - seems redis is synchronous by nature https://github.com/ptaoussanis/carmine/issues/148 .
It will depend on the redis client you use, but in general there is no problem doing blocking io, just don't do it directly on a go
thanks, I did not know that. I'm exploring now so that is not important atm. This is what I have for now
I wonder if carmine / redis support listeners for this kind of thing.
(async/thread (loop (if-let [message (wcar* (car/lpop "queue"))] (do (println "Got message from queue" message) (async/>!! in message)) ; Sleep if no messages available (Thread/sleep 1000)) (recur)))