Fork me on GitHub

I'm trying to figure out how core async works (pipeline-async ) but I am struggling to understand some things: I have a redis server where I push github search urls. I have a go-loop that reads the urls and pushes them to an in channel I have a go-loop that reads from out channel and prints to console . I used pipeline-async to read urls from in , fetches the seerach results from github and pushes them to out, one by one. The issue with the above code is that it works only once and I can't make it "listen" to redis and process messages continuously. the code is inspired from this gist and I posted my solution there . Any ideas what am I doing wrong ?

Ferdinand Beyer10:04:47

You are trying to use pipeline-async with clj-http. To do that, use async HTTP calls and supply a callback method that will async/put! the result on the out channel provided by pipeline-async


thanks, looking at that too. @U0NCTKEV8 also had some pointers about redis reading.


so I have a working version, thanks for all the pointers:


I am not sure I understand why I need to put! to out and close out* . I believe closing out* is used for signaling. cose = work has finished for the url ?!

Ferdinand Beyer11:04:23

Well first because this is the contract that pipeline-async specifies for the af async function: >

> must be a function of two arguments, the first an input value and
> the second a channel on which to place the result(s). af must close!
> the channel before returning.
And you are right: af can produce zero or more results, and must close! to signal all results are ready


I updated my solution to work properly regarding out channel . I was not putting to the channel received from pipeline async. thanks


You should be writing to out* then closing out*


yes, I fixed that just now


I think I am getting the hang of this - could not have done it without your help. I will try to migrate the sample to use pipeline-blocking.


In the exception case you are not closing out*


good point

Ferdinand Beyer11:04:23

(defn request-handler [url out*]
Don’t do that. Just run client/get directly, not in a go block

Ferdinand Beyer11:04:59

Also, something you should never forget to do in core.async (I learned it the hard way): You need to ensure that your application will wait for async processes to finish. In your case, the pipeline-blocking will return a channel that will close when done. If you call this from -main without blocking the main thread on this channel, your application will immediately end without waiting for the async processes to finish.


I fixed the function.


for pipeline-blocking I have this code. But it put! a sequence to the channel instead of individual values. I guess the semantics are changed in this case and I will have to update my code to take a sequence from the out channel. Just asking: is there a way to put! individual values in the case of pipeline-blocking ?!

Ferdinand Beyer11:04:56

  (async/>! out* (get repo "clone_url"))
Don’t run a go just to >!, use async/put! instead.

Ferdinand Beyer12:04:57

In the pipeline-blocking case: Nope, it will put the return value on the out channel. But you can use a transducer with your out channel:

(let [out (async/chan 4 cat)]
The cat transducer will take sequences and concatenate them, so the out channel will not expect sequences, but readers get the individual values.

❤️ 1
👍 1

You don't have a go loop there


There is a go, but no loop in it


io mean in the handler function I should have a go loop instead of a go?


You are also doing up in the go, which you should not do


I don't understand what you mean, could you please rephrase it ?


You are doing blocking io reading from redis in a go, which is blocking the threadpool that go's run on


You are not supposed to do blocking operations in go because it will starve other go blocks


I don't think that is the case. This code prints "Sleeping" every 1s since car/lpop returns nil if there is no value

(async/go-loop []
      (if-let [message (wcar* (car/lpop "queue"))]
          (println "Got message from queue" message)
          (async/>! in message))
      ; Sleep if no messages available
          (println "sleeping")
          (async/<! (async/timeout 1000))))


If it is the cause of your issue or not I don't know, but it is absolutely the case that you should not do it, and it will cause issues if you do


thanks, I will take a look at this as well - seems redis is synchronous by nature .


It will depend on the redis client you use, but in general there is no problem doing blocking io, just don't do it directly on a go


I use carmine


Wrap your io in the thread macro


Carmine internally runs a global threadpool, which is not great


thanks, I did not know that. I'm exploring now so that is not important atm. This is what I have for now

       (if-let [message (wcar* (car/lpop "queue"))]
           (println "Got message from queue" message)
           (async/>!! in message))
          ; Sleep if no messages available
         (Thread/sleep 1000))
I wonder if carmine / redis support listeners for this kind of thing.


Redis has a whole pubsub thing