Fork me on GitHub
Ben Sless07:04:49

I have a question regarding propagating backpressure with promesa I have created this event function:

(fn [_ rec]
          (->> (p/promise rec)
               (p/map exec deserialize)
               (p/mapcat exec http-request)
               (p/map exec serialize)
               (p/map exec produce)))
And I invoke it in a loop
(loop [recs (poll)]
      (reduce event-fn nil recs)
      (recur (poll)))
(using reduce because recs are iterable and I want to go fast) this implementation incorrectly propagates backpressure to the poll function, which just returns an iterable of data. What did I miss here and how do I fix it? Thanks, promesa is cool


if you execute event-fn in a loop without waiting it result, it can cause that event-fn can start again before the previous execution ends


you have two options: use deref on the end for "block" and wait until terminated


or use the promesa.core/loop and promesa.core/recur



Ben Sless07:04:50

Makes sense. But I want to get to 100% cpu usage, so would want to start execution before the previous execution ends, and only block on execution start when all the threads in the exec pool are busy

Ben Sless07:04:07

What I ended up doing was

(loop [recs (poll)]
      @(p/all (reduce event-fn [] recs))
      (recur (poll)))
But it didn't get close to eating all the 100% utilization

Ben Sless07:04:55

What would happen if I provided the executor at the beginning? (p/promise rec exec)

Ben Sless07:04:22

Or perhaps I should use p/create ?


It depends, not using 100% can have different causes. In any case, in normal situation you dont need to pass a custom executor service, and in your case it can cause this nrgative effect. The CompletableFuture, by default resolves all chained computations synchronously in the same thread where the first promise is resolved. Providing an specific executor service will cause additional task spwaning, adding more latency and being less efficient.


Spawning a task for each small "partial" computation is usefull when you have 100000+ small tasks and you dont want to block a thread with a single chain of compuation


in other words small tasks will enable cooperativeness at cost of latency


In any case, promesa here is just a sugar syntax on top of completable future, if you have problems with backpressure or not using 100% cpu is probably a design problem than promesa problem.


about your problem i think, using the default thread pool will be more efficient, you just need to use p/all without providing any specific thread pool. but without knowning all the intervinients (such that how http request is made, etc.. etc) i can't help more...

Ben Sless06:04:26

the http request is just a promisify of org.httpkit.client/request