Fork me on GitHub
#core-async
<
2015-06-18
>
matttylr08:06:44

I have a core.async design question

matttylr08:06:58

I have a periodic job that runs to get the status of a set of package shipments from various couriers (dhl, fedex, etc)

matttylr08:06:29

so I create a set of channels, issue the api call and put the returned result on the channel

matttylr08:06:52

I then core.async/merge those channels and then use alts! in a go-loop to collect all the results

matttylr08:06:12

Here's my wait function

(defn wait-values [wait channels]
  (let [all (merge channels)
        t-out (timeout wait)]
    (go-loop [values []]
      (let [[value _] (alts! [all t-out])]
        (if (nil? value)
          values
          (recur (conj values value)))))))

matttylr08:06:50

Now this is where my understanding of core.async perhaps isn't as good as it needs to be: how would this scale?

matttylr08:06:39

It's probably fine if I have a small number of shipments, but what about when the number of requests and channels gets large when there are 100's or 1000's of shipments to be tracked?

slipset09:06:29

why do you merge the channels?

slipset09:06:11

I would have thought that you'd do the alts on all the channels

slipset09:06:24

also, I do think that Timothy Baldrigde has a demo in here https://www.youtube.com/watch?v=enwIIGzhahw where he shows usage of 1000 channels

slipset09:06:08

I guess at around 30 minutes into the talk.

slipset09:06:06

nah, he doesn't alts on that many channels.

matttylr09:06:42

hmm, merge is redundant?

matttylr09:06:22

I can just alts! on (conj channels t-out)

stijn09:06:42

@matttylr: if you want to introduce parallelism you may consider using a pipeline instead of a single go-loop

stijn09:06:20

hmmm, maybe not since you just want to get values until a timeout fires

matttylr09:06:56

I had looked briefly at pipelines but I think my use case is 'shorter' than that

matttylr09:06:09

I just a have a (potentially large) set of parallel calls

matttylr09:06:31

the results of which I want in a list

matttylr09:06:54

This must be a common pattern for core.async usage I just wasn't succeeding in finding any good examples

stijn09:06:50

well, the moment you’re creating that final list, you’ll have to do it in 1 process right?

matttylr09:06:24

yes, there is a consumer of that list

stijn09:06:46

but if you want to do processing on the values of the list, you might consider doing that before you create the list

stijn09:06:08

and then pipelines come in handy

matttylr09:06:09

yes, I run some xpath queries over the results as they come in

matttylr09:06:23

I see what you mean

matttylr09:06:43

I think I was just struggling to visualise the 'structure' of that

matttylr09:06:08

there's the fanout of the queries, the processing of the results and collation of the derived data

stijn09:06:52

typically you also want to control the parallelism in the api calls by issuing them from a pipeline-async and then send the results to a pipeline with a transducer that performs the xpath queries

stijn09:06:21

and at the end create the final list with a timeout

matttylr09:06:28

looks like something along the right lines

slipset09:06:39

There is the canonical core.async example somewhere, hang on

slipset09:06:21

search for fake-search

stijn09:06:51

(let [in (chan)
      responses (chan)
      query-results (chan)]
  (pipeline-async n1 responses perform-api-request in)
  (pipeline n2 query-results query-xf responses)
  (onto-chan in requests)
  (go-loop []
           ;; over query-results
           ))

matttylr09:06:39

thanks stijn & slipset for those examples