Fork me on GitHub
#core-async
<
2022-04-06
>
Eugen10:04:01

I'm trying to figure out how core async works (pipeline-async ) but I am struggling to understand some things: I have a redis server where I push github search urls. I have a go-loop that reads the urls and pushes them to an in channel I have a go-loop that reads from out channel and prints to console . I used pipeline-async to read urls from in , fetches the seerach results from github and pushes them to out, one by one. The issue with the above code is that it works only once and I can't make it "listen" to redis and process messages continuously. the code is inspired from this gist and I posted my solution there https://gist.github.com/JacobNinja/5c98496a632e1a466cbf . Any ideas what am I doing wrong ?

Ferdinand Beyer10:04:47

You are trying to use pipeline-async with clj-http. To do that, use async HTTP calls and supply a callback method that will async/put! the result on the out channel provided by pipeline-async

Eugen10:04:33

thanks, looking at that too. @U0NCTKEV8 also had some pointers about redis reading.

Eugen11:04:11

so I have a working version, thanks for all the pointers:

Eugen11:04:37

I am not sure I understand why I need to put! to out and close out* . I believe closing out* is used for signaling. cose = work has finished for the url ?!

Ferdinand Beyer11:04:23

Well first because this is the contract that pipeline-async specifies for the af async function: >

af
> must be a function of two arguments, the first an input value and
> the second a channel on which to place the result(s). af must close!
> the channel before returning.
And you are right: af can produce zero or more results, and must close! to signal all results are ready

Eugen11:04:19

I updated my solution to work properly regarding out channel . I was not putting to the channel received from pipeline async. thanks

hiredman11:04:37

You should be writing to out* then closing out*

Eugen11:04:57

yes, I fixed that just now

Eugen11:04:47

I think I am getting the hang of this - could not have done it without your help. I will try to migrate the sample to use pipeline-blocking.

hiredman11:04:55

In the exception case you are not closing out*

Eugen11:04:09

good point

Ferdinand Beyer11:04:23

(defn request-handler [url out*]
    (async/go
Don’t do that. Just run client/get directly, not in a go block

Ferdinand Beyer11:04:59

Also, something you should never forget to do in core.async (I learned it the hard way): You need to ensure that your application will wait for async processes to finish. In your case, the pipeline-blocking will return a channel that will close when done. If you call this from -main without blocking the main thread on this channel, your application will immediately end without waiting for the async processes to finish.

Eugen11:04:27

I fixed the function.

Eugen11:04:25

for pipeline-blocking I have this code. But it put! a sequence to the channel instead of individual values. I guess the semantics are changed in this case and I will have to update my code to take a sequence from the out channel. Just asking: is there a way to put! individual values in the case of pipeline-blocking ?!

Ferdinand Beyer11:04:56

(async/go
  (async/>! out* (get repo "clone_url"))
Don’t run a go just to >!, use async/put! instead. https://clojure.org/guides/core_async_go

Ferdinand Beyer12:04:57

In the pipeline-blocking case: Nope, it will put the return value on the out channel. But you can use a transducer with your out channel:

(let [out (async/chan 4 cat)]
The cat transducer will take sequences and concatenate them, so the out channel will not expect sequences, but readers get the individual values.

❤️ 1
👍 1
hiredman10:04:21

You don't have a go loop there

hiredman10:04:44

There is a go, but no loop in it

Eugen10:04:21

io mean in the handler function I should have a go loop instead of a go?

hiredman10:04:58

You are also doing up in the go, which you should not do

Eugen10:04:56

I don't understand what you mean, could you please rephrase it ?

hiredman10:04:56

You are doing blocking io reading from redis in a go, which is blocking the threadpool that go's run on

hiredman10:04:42

You are not supposed to do blocking operations in go because it will starve other go blocks

Eugen10:04:45

I don't think that is the case. This code prints "Sleeping" every 1s since car/lpop returns nil if there is no value

(async/go-loop []
      (if-let [message (wcar* (car/lpop "queue"))]
        (do
          (println "Got message from queue" message)
          (async/>! in message))
      ; Sleep if no messages available
        (do
          (println "sleeping")
          (async/<! (async/timeout 1000))))
      (recur))

hiredman10:04:14

If it is the cause of your issue or not I don't know, but it is absolutely the case that you should not do it, and it will cause issues if you do

Eugen10:04:36

thanks, I will take a look at this as well - seems redis is synchronous by nature https://github.com/ptaoussanis/carmine/issues/148 .

hiredman10:04:57

It will depend on the redis client you use, but in general there is no problem doing blocking io, just don't do it directly on a go

Eugen10:04:11

I use carmine

hiredman10:04:33

Wrap your io in the thread macro

hiredman10:04:02

Carmine internally runs a global threadpool, which is not great

Eugen10:04:33

thanks, I did not know that. I'm exploring now so that is not important atm. This is what I have for now

(async/thread
      (loop
       (if-let [message (wcar* (car/lpop "queue"))]
         (do
           (println "Got message from queue" message)
           (async/>!! in message))
          ; Sleep if no messages available
         (Thread/sleep 1000))
        (recur)))
I wonder if carmine / redis support listeners for this kind of thing.

hiredman10:04:50

Redis has a whole pubsub thing