I need to call a server 50k times (data processing task, can't be batched) and store the results as a single JSON file in the end. Each request takes ~2–4sec and costs money, which makes errors painful. I think in theory something like should work with error-handling code
(->> items
(pmap http-get-request)
(into [])
(json/generate-string)
(spit "file.json")
but I thought about keeping track of of what's left to process and continue from there in case of catastrophic error.
I'm not experienced with core.async so it might not be the right tool for this. I'm imagining http calls done in parallel (using pmap? 50k go blocks?), results put into a channel, and in the end a consumer takes from the channel and sequentially appends results to a file. If this makes sense to begin with, I'm not sure how to make the http calls in parallel 🤔 creating 50k go blocks with doseq doesn't sound right, the server will probably block me (I'm allowed 400req/s).If you're on a JVM with vthreads, you may not need core.async for this; you may be able to get away with a fixed size threadpool, potentially using something like https://github.com/clj-commons/claypoole to run the requests in parallel. A library like https://github.com/brunoV/throttler (disclaimer: have not used), which is compatible with ordinary functions and core.async channels, may help you respect your API's rate limits.
I would also strongly recommend saving each individual result instead of deferring to the end of your pipeline to persist them.
> I would also strongly recommend saving each individual result instead of deferring to the end of your pipeline to persist them. @afoltzm my mind is locked into a channel-based solution to achieve this mismatch between async network calls and sequential persistence. Were you thinking about something else, based on your lib recommendations (Claypool is cool, thanks!)
I don’t think I would reach for core.async here, just pushing them into futures would let you handle/ignore errors individually
Oh, ha I misread 50k as 50 :) yes, I would definitely create an executor service on a fixed size thread pool to throttle the tasks (whether claypool or just with interop)
FWIW, that type of use case where reliable processing is needed is fairly ideal for https://temporal.io/ and I have authored a clojure SDK for it https://github.com/manetu/temporal-clojure-sdk
For this use case, I would probably go with a workflow with one activity that runs through the 50k, using the heartbeat mechanism to checkpoint progress, and returns the completed json. The body of the activity can use the approaches suggested above (e.g. a thread-pool)
What I would do is not bother with retries at the individual call level. Make a call, return the valid result or the error turned into data. Then write to the JSON both successful and errored. Then run through the JSON again, and for those in error now only re-run those, and so on, until they all succeed. First implement this non-concurrently. Then if you care that it runs faster, add concurrency.