This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2022-04-06
Channels
- # announcements (33)
- # babashka (13)
- # babashka-sci-dev (23)
- # beginners (94)
- # calva (105)
- # cider (37)
- # circleci (1)
- # clj-kondo (45)
- # cljs-dev (59)
- # cljsrn (2)
- # clojure (145)
- # clojure-czech (1)
- # clojure-europe (19)
- # clojure-nl (18)
- # clojure-norway (13)
- # clojure-portugal (1)
- # clojure-uk (5)
- # clojurescript (19)
- # community-development (2)
- # conjure (3)
- # copenhagen-clojurians (1)
- # core-async (34)
- # data-science (6)
- # datomic (25)
- # emacs (43)
- # fulcro (19)
- # graalvm (7)
- # graalvm-mobile (12)
- # graphql (10)
- # honeysql (3)
- # hyperfiddle (3)
- # improve-getting-started (2)
- # interop (20)
- # kaocha (3)
- # lsp (16)
- # meander (7)
- # off-topic (22)
- # other-languages (14)
- # portal (15)
- # releases (1)
- # rewrite-clj (2)
- # ring (1)
- # shadow-cljs (119)
- # spacemacs (19)
- # sql (65)
- # testing (4)
- # tools-deps (11)
- # xtdb (29)
I'm trying to figure out how core async works (pipeline-async ) but I am struggling to understand some things:
I have a redis server where I push github search urls.
I have a go-loop
that reads the urls and pushes them to an in
channel
I have a go-loop
that reads from out
channel and prints to console .
I used pipeline-async
to read urls from in
, fetches the seerach results from github and pushes them to out, one by one.
The issue with the above code is that it works only once and I can't make it "listen" to redis and process messages continuously.
the code is inspired from this gist and I posted my solution there https://gist.github.com/JacobNinja/5c98496a632e1a466cbf .
Any ideas what am I doing wrong ?
You are trying to use pipeline-async
with clj-http
. To do that, use async HTTP calls and supply a callback method that will async/put!
the result on the out
channel provided by pipeline-async
thanks, looking at that too. @U0NCTKEV8 also had some pointers about redis reading.
I am not sure I understand why I need to put!
to out
and close out*
.
I believe closing out*
is used for signaling.
cose = work has finished for the url ?!
Well first because this is the contract that pipeline-async
specifies for the af
async function:
>
af
> must be a function of two arguments, the first an input value and
> the second a channel on which to place the result(s). af must close!
> the channel before returning.
And you are right: af
can produce zero or more results, and must close!
to signal all results are readyI updated my solution to work properly regarding out
channel .
I was not putting to the channel received from pipeline async.
thanks
I think I am getting the hang of this - could not have done it without your help. I will try to migrate the sample to use pipeline-blocking.
(defn request-handler [url out*]
(async/go
Don’t do that. Just run client/get
directly, not in a go
blockAlso, something you should never forget to do in core.async (I learned it the hard way): You need to ensure that your application will wait for async processes to finish.
In your case, the pipeline-blocking
will return a channel that will close when done. If you call this from -main
without blocking the main thread on this channel, your application will immediately end without waiting for the async processes to finish.
for pipeline-blocking I have this code.
But it put!
a sequence to the channel instead of individual values.
I guess the semantics are changed in this case and I will have to update my code to take a sequence from the out
channel.
Just asking: is there a way to put! individual values in the case of pipeline-blocking ?!
(async/go
(async/>! out* (get repo "clone_url"))
Don’t run a go
just to >!
, use async/put!
instead.
https://clojure.org/guides/core_async_goIn the pipeline-blocking
case: Nope, it will put the return value on the out channel. But you can use a transducer with your out channel:
(let [out (async/chan 4 cat)]
The cat
transducer will take sequences and concatenate them, so the out
channel will not expect sequences, but readers get the individual values.You are doing blocking io reading from redis in a go, which is blocking the threadpool that go's run on
You are not supposed to do blocking operations in go because it will starve other go blocks
I don't think that is the case.
This code prints "Sleeping" every 1s since car/lpop
returns nil
if there is no value
(async/go-loop []
(if-let [message (wcar* (car/lpop "queue"))]
(do
(println "Got message from queue" message)
(async/>! in message))
; Sleep if no messages available
(do
(println "sleeping")
(async/<! (async/timeout 1000))))
(recur))
If it is the cause of your issue or not I don't know, but it is absolutely the case that you should not do it, and it will cause issues if you do
thanks, I will take a look at this as well - seems redis is synchronous by nature https://github.com/ptaoussanis/carmine/issues/148 .
It will depend on the redis client you use, but in general there is no problem doing blocking io, just don't do it directly on a go
thanks, I did not know that. I'm exploring now so that is not important atm. This is what I have for now
(async/thread
(loop
(if-let [message (wcar* (car/lpop "queue"))]
(do
(println "Got message from queue" message)
(async/>!! in message))
; Sleep if no messages available
(Thread/sleep 1000))
(recur)))
I wonder if carmine / redis support listeners for this kind of thing.