Fork me on GitHub
#core-async
<
2019-02-05
>
jimbob02:02:09

is this an appropriate usage of pipeline?

(a/pipeline
             (.availableProcessors (Runtime/getRuntime))
             (doto (a/chan) (a/close!))
             (purchase-sender-xf purchase)
             (a/to-chan (range num-to-send)))

jimbob02:02:34

where prchase-sender-xf is a transducer that transforms maps and lastly sends them to SQS?

jimbob02:02:53

confused if i should use pipeline-blockign instead or pipeline async

hiredman02:02:53

if you aren't going to actually do anything with the results, I don't think pipeline is appropriate

hiredman17:02:57

that isn't a particularly high quality blog post

hiredman17:02:28

pipeline (not pipeline-async or pipeline-blocking) does everything on the go block threadpool, which by default is limited to size 8

hiredman17:02:20

doing blocking io on the go block threadbool is a bad idea because it means other go blocks cannot run on threadpool while you are blocked on io

jimbob19:02:51

ah that makes sense

jimbob20:02:05

can still have it in a go block if the go block dispatches a thread and does a parking take on the thread that is doing the io?

jimbob20:02:29

i suppose that makes sense, considering go blocks are basically FSM and its then parked the go block on that thread so it can be reallocated elsewhere

hiredman02:02:23

limiting pipelines to .available processors doesn't do what you would expect

hiredman02:02:41

doing io as part of a pipeline or an xform on a channel is a bad idea

hiredman02:02:18

you likely just want an executor

jimbob06:02:39

ok. thanks for the answers, I really do very much appreciate it, but i suppose if there are any explanations or articles that would be preferable. for example. My job is to load test an application say. this doesn’t need to be extremely robust just run once every day or so.. i just want to be able to send transactions generated to a queue quickly.

hiredman17:02:39

assuming you are using core.async go blocks in your app, and you are in one of those when you went to send something to the sqs queue I would do it like (async/<! (async/thread (do-sqs-io-stuff)))

jimbob19:02:17

for transducers i want to lazy have purchase maps in memory and to parallelize sending them to sqs

jimbob19:02:16

rather: all i want to do is lazy evaluate batches of 10 and then send each batch of 10 to sqs in parallel

jimbob19:02:27

so something like:

(pmap batch-send-txn (map (comp (map (partial mk-event)) (partition-all 10))) (repeat num-to-send {}))

jimbob19:02:28

transduce works:

(transduce (comp (map load.test-generation/make-purchase-event) (take 10)) conj [{}])
=> 
[list of objects]

jimbob19:02:32

but map does not

jimbob20:02:43

i just want to clean this up. its doing so many pass throughs and intermediate steps

jimbob20:02:51

(->>  (repeat 100 {})
                            (map make-purchase-event)
                            (partition-all 10)
                            (pmap batch-send-txn))

hiredman22:02:04

why the limit of 10?

hiredman22:02:07

if you really must manually limit it to 10 requests in flight at once I would do something like

(def exec (Executors/newFixedThreadpool 10))

(async/go
  (let [ch (async/chan)]
    (.execute exec (fn []
                     (try
                       (do-whatever-io)
                       (finally
                         (async/close! ch)))))
    (async/<! ch)))

hiredman22:02:29

I forget the name of the static factory method on Executors, so I likely got it slightly wrong

hiredman22:02:29

the other problem, if you don't care about result values, is pmap and pipeline both work strictly in order, produce outputs in an order that matches inputs, with if you don't care about outputs is limiting

jimbob23:02:45

ya that makes sense. i do not care about order at all. but do carea bout aggregating output

hiredman22:02:30

pmap is terrible because it hides a lot of stuff implicit stuff, that you almost always won't exposed and explicit

hiredman22:02:43

but in general, either your app is cpu limited or io limited, if it is cpu limited, then limiting the threads to do io doesn't matter, if it is io limitted, why create an artificial io limit as well? you either have threads in the os schedulers queue, or callbacks in the executors queue

hiredman22:02:04

so just do (async/<! (async/thread (do-blocking-io)))

jimbob22:02:24

limit 10 is the sqs limit per call to sendBatchMessage. not limiting to 10 n flight at one, just 10 messages in flight for one API call

hiredman22:02:19

the problem with partition and a transducer is partition has issues when used concurrently, and it will block until you get the partition number or the channel is closed at the end

jimbob23:02:27

partition-all can include fewer than n items

hiredman22:02:54

typically you would want something that, for example, groups requests 10 at a time, but if no new request comes in some time period, passes through the group regardless of the size

jimbob23:02:35

ah i see. yes right now i’m using pmap, which im realizing after taking with some folks, is not great for io, but rather excels at lazy ordered cpu intensive parallelization.

jimbob23:02:44

I’d much prefer a separate thread per io op, so was considering doing basically exactly what you described, however not sure i need a go block for it. i dont see much downside however besides overhead

jimbob23:02:27

i definitely want more fine tuned control over parallelization than what pmap gives me. i’ll likely instead just dispatch futures and map over them all after looping and creating n futures, one for each io and then afterwards aggregating the futures with a kind of (map @deref futures) to wait until they are all realized.

jimbob23:02:14

was thinking either clojure async futures, or claypoole type futures here: https://github.com/TheClimateCorporation/claypoole

jimbob23:02:10

using java api interop for sqs so dont really want to have to add a bunch of interop code to deal with async aws clients and callbacks with java futures

jimbob23:02:21

though that still might be best.