Wrote up a 'poor man's dataloader'. Would love your thoughts! Context A user can make N concurrent database requests:
SELECT * FROM users WHERE id = 1
SELECT * FROM users WHERE id = 2
(these are analogous to get-one)
But we want to instead batch it together
SELECT * FROM users WHERE id IN (1, 2)
(this is analogous to get-batched)
Here's how I achieve it:
(def loader (atom {:requests [] :schedule-delay nil}))
(defn get-batched [inputs]
(println "batching!" inputs)
(mapv (fn [id] (keyword (str "r-" id)))
inputs))
(defn run-batch! []
(let [[old _] (swap-vals! loader
(fn [_]
{:requests []
:schedule-delay nil}))
{:keys [requests]} old
inputs (mapv :input requests)
results (get-batched inputs)
result-promises (mapv :result-promise requests)]
(doseq [[p r] (map vector result-promises results)]
(deliver p r))))
(defn schedule [request]
(let [{:keys [schedule-delay]}
(swap! loader (fn [{:keys [requests schedule-delay]}]
{:requests (conj requests request)
:schedule-delay (or schedule-delay
(delay
(ua/vfuture
(Thread/sleep 5)
(run-batch!))))}))]
@schedule-delay))
(defn get-one [input]
(let [p (promise)
m {:input input
:result-promise p}]
(schedule m)
@p))
(comment
(pmap (fn [i] (get-one i)) (range 10)))Here's a few thoughts from skimming:
• It's not clear to me what the goal is. A description of the goal and what each function does would be helpful.
• run-batch! doesn't accept any arguments. I think it's ok for helper functions like this to use globals, but I would implement it so that there's some arity that accepts all the inputs and a zero arity that passes in all the globals.
• I suspect you would be better off using some sort of executor.
See https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/Executors.html
Thanks for looking @smith.adriane! I updated the description to explain the problem & the goal a bit more. I also noted that I am running the future inside a virtual thread executor.
why not just use a SingleThreadExecutor? Why is there a 5ms delay being added?
There's also some coupling between the task data and task runner. run-batch! expects an :input key, which it internally calls ids . Afterwards, the ids are transformed into vs? All of that is confusing to me. Is there any reason not to use a consistent name?
It also seems like results are being held inside the loader atom forever.
> why not just use a SingleThreadExecutor? Why is there a 5ms delay being added? My goal is, if someone does:
(pmap (fn [i] (get-one i)) (range 10))
I would want get-batched to be called with [1 2 3 4 5 6 7 8 9]
I am not sure how this would work with a single threaded executor. Would love to learn more!I haven't actually tested this code, but it would look something like:
(def promises
(mapv (fn [input]
(.submit my-executor (fn []
(do-work input))))
inputs))
(def first-result @(first promises))if my-executor is a SingleThreadExecutor, then jobs would be executed one at a time, in order, on a single thread.
(.submit my-executor a-fn) returns a Future which can be dereferenced.
(I went ahead and updated the code to use more consistent variable names)
Is do-work in that example, the function get-one?
get-one schedules something where as do-work actually accomplishes a single task
you could run (do-work my-input) directly and it wouldn't use the executor
In the code above, what I would like to do is, to 'batch' all the tasks into one function call.
This is for something like, a user makes N concurrent database requests:
SELECT * FROM users WHERE id = 1
SELECT * FROM users WHERE id = 2
(these are analogous to`get-one`)
But we instead run
SELECT * FROM users WHERE id IN (1, 2)
(this is analogous to get-batched)
I may be misunderstanding what you mean by do-work and input here.Ah. I did not realize that was the goal.
There are multiple ways to do this, but I would use core.async since I'm familiar with it and it could be expressed naturally.
Updated the post to try to state the goal better 🫡. If you end up outlining a solution with core async would love to see it! I was thinking about core.async too, but wasn't sure how to express it more naturally than this.
The updated problem statement is great! 👍