Fork me on GitHub
#core-async
<
2017-01-17
>
joshjones19:01:24

asking anyone, but particularly @tbaldridge — I am simulating http-kit's get call, which will return a promise and which can optionally take a callback function. Based on Rich’s async talk, I did some test async code and was wondering which of the following two versions is superior? My guess is the first one, since it seems as if it would create fewer resources. First, the get call:

joshjones19:01:41

(defn http-get
  ([url callback]
   (let [sleep-time (+ 1000 (rand-int 1000))
         prom (promise)]
     (future
      (println (str "\nIn http-get's future -- about to sleep for" sleep-time "ms"))
      (Thread/sleep sleep-time)
      (deliver prom sleep-time)
      (when callback
        (callback sleep-time)))
     prom))
  ([url]
   (http-get url nil)))

joshjones19:01:15

it sleeps from 1 to 2 seconds to simulate retrieving data

joshjones19:01:37

The first version to call this function is as such:

(defn search
  [& requests]
  (let [result-chan (chan)]
    (run! (fn [req] (http-get req #(put! result-chan %)))
          requests)
    (alt!!
      (timeout 1500) "timed out 1500ms!"
      result-chan ([result] result))))

joshjones19:01:32

it creates a channel, then for every request it calls the http-get function and gives it a callback, which simply puts the result on the channel. The goal is to get the first reply, or a 1.5s timeout.

joshjones19:01:26

The following version seems like the wrong way to go about it, but it was similar to what was given in Rich’s talk as an example, so I want to understand if this would be a common approach:

(defn http-get-to-chan
  [url]
  (go @(http-get url)))

(defn search-v2
  [& requests]
  (let [result-chan (chan)]
    (run! #(go (>! result-chan (<! (http-get-to-chan %))))
          requests)
    (alt!!
      (timeout 1500) "timed out 1500ms!"
      result-chan ([result] result))))

joshjones19:01:39

This version is similar, except that it explicitly uses a go block, and has to create an extra channel in the http-get-to-chan function. This version avoids the callback, but introduces complexity via the extra go blocks. Thoughts on either version?

tbaldridge19:01:09

to be blunt; why are you trying to avoid the callback?

joshjones19:01:37

there is no reason at all — just wanted to explore options, and perhaps there will be a case where a library does not provide a callback option.

joshjones19:01:50

(perhaps it only returns a promise, for example)

tbaldridge19:01:51

One creates one channel for each request, the other creates 3 and just shuffles data around

tbaldridge19:01:21

But more concerning is that the second approach performs a blocking call inside a callback

tbaldridge19:01:32

and go blocks use a fixed sized thread pool.

joshjones19:01:50

you mean the http-get-to-chan yes?

tbaldridge19:01:00

yeah, the deref is blocking

joshjones19:01:01

this is the slide from Rich’s talk:

tbaldridge19:01:06

yeah, sadly that's a converted example from Go...and go is really wonky

joshjones19:01:26

so instead of doing (go …) there, I would do (thread @blocking-call)?

tbaldridge19:01:48

you could, but a better approach is to have each call to your webserver accept a channel

tbaldridge19:01:57

So you can do :

tbaldridge19:01:21

(def c (chan)) (doseq [request requests] (http-call request c)) (alt!! c ([result] result) (timeout 1500) ...)

tbaldridge19:01:29

bleh that looks ugly, but you get the point

tbaldridge19:01:08

but that will block some puts, so make the creation of the channel be (def c (chan (count requests)))

tbaldridge19:01:15

so that there's enough space for all the results

joshjones19:01:25

ok — this is almost exactly what I did in the first search version given above, except that instead of taking a callback which uses a closure to capture the channel, it takes a channel itself — right?

tbaldridge19:01:05

both are valid though, it all depends on what interface you're given, and how you want to wrap it up.

joshjones19:01:49

ok excellent — thanks for the feedback @tbaldridge — I have to become familiar with clojure async idioms, and this was very helpful

royalaid23:01:25

@tbaldridge @joshjones Coming into and working with Core.async I can't tell you how much time I lost eventually just to learn that you should never do a blocking call inside of a go block.

royalaid23:01:16

It might be a good idea to put somewhere in the docs that you can seize up the threadpool by doing so

royalaid23:01:53

because at first it appears that core.aysnc == clojure-flavored go

royalaid23:01:20

and the above linked talk seems to be misleading

adamkowalski23:01:57

How would you approach working with core async channels if you need two way communication

adamkowalski23:01:18

I am working with kafka, and they have producers and consumers for streams of data and I am trying to model them as channels

adamkowalski23:01:58

But consumers can dynamically subscribe to various streams so I was thinking about having a commands queue that I can use to talk to the consumer

adamkowalski23:01:58

then when the consumer polls the broker and gets the latest data it would put it onto a output channel that someone else could consume

adamkowalski23:01:29

Am I going down the right path, or should I have one two way channel? Or should I be approaching this completely differently?

tbaldridge23:01:12

So why do you need two way communication? One channel for incoming messages, one for outgoing

tbaldridge23:01:27

but don't "ack" messages until they are really done processing, that's just asking for trouble

hiredman23:01:02

I would treat the consumer as something like a mult, a think that you can tap for channels that map to streams, not as a single channel