Fork me on GitHub
#clojure
<
2024-07-10
>
escherize00:07:42

I have about 5000 io bound requests that I want to do in parallel. I don’t want to use pmap for parallelizing io bound tasks. I want to keep the thread spawning to a constant number (so no mapping future, then mapping deref). Core async is off the table too. What would you use? I’m thinking an executor service and a queue, but not sure

ghaskins01:07:43

If you can leverage JDK21+, check out the promesa vthread executor

seancorfield01:07:17

It's pretty straightforward to just use Java interop -- no need to drag in Promesa just for that (it has other stuff that's good).

👍 2
ghaskins01:07:48

@U04V70XH6 you are, of course, right. I wouldn’t bring in all of promesa just for the executor wrapper. What I was thinking but articulated poorly was that the OP could probably build a decent solution for the overall problem with a variety of promesa building blocks, including the vthread executor

seancorfield01:07:55

(Executors/newVirtualThreadPerTaskExecutor) gets you the vthread executor, then (.submit exec #(some-thing :here))

seancorfield01:07:51

Using vthreads would mean you didn't need to worry about the number of spawned threads too.

seancorfield01:07:59

@U13AR6ME1 True, yes, there's a nice vthread-based version of core.async in Promesa which I've played with and have considered for use at work...

ghaskins01:07:57

Yeah, if you are not concerned with managing io pressure on the downstream service, I’d just blast away with a simple vthread executor. If you need to constrain max concurrent requests then other constructs can be added on top.

didibus03:07:26

Executor Completion Service comes with a queue internally, so you don't need a queue, it handles all that for you.

didibus03:07:31

You can also do this with a virtual thread executor, but then you have to use a semaphore inside each task to limit the concurrency. You acquire a semaphore, then perfom the io, and release the semaphore in a finally block.

didibus03:07:50

If you don't care to handle results as they complete, you don't need a completion service. And you can actually use the semaphore trick with future and mapping deref:

(defn await-io
  [input-coll io-fn n]
  (let [sem (Semaphore. n false)
        res (for [e input-coll]
              (do (.acquire sem)
                  (future (try (io-fn e) (finally (.release sem))))))]
    (mapv deref res)))
This creates n number of futures, so it's max n threads. It then waits for any of them to complete before firing off another future. Finally it waits for all IO to be done, and returns a vector of their results in the same order as you submitted them.

didibus04:07:40

Well, and with a little more effort, you can also make this work in completion order by just adding in a LinkedBlockingQueue:

(defn completion-io
  [input-coll io-fn n]
  (let [sem (Semaphore. n false)
        results (LinkedBlockingQueue.)]
    (future
      (doseq [e input-coll]
        (.acquire sem)
        (future
          (try
            (.put results (io-fn e))
            (catch Exception ex
              (.put results ex))
            (finally
              (.release sem))))))
    results))
Now you can also call take/poll on the return of await-io to retrieve results in completion order. It's probably kind of re-implementing a bit all what ExecutorCompletionService and all does, but at the same time, its arguable less code overall and cleaner looking :man-shrugging:

seancorfield04:07:27

I think the real takeaway here is that you have a lot of fairly simple options using just Java interop -- which is really one of the big wins with Clojure!

💯 3
didibus04:07:19

@U04V70XH6 Is your previous example using Clojure 1.12? Don't you need to reify when submitting to the executor?

seancorfield04:07:53

No, Clojure functions are Callable.

seancorfield04:07:35

That's actually old code from work that's been running for... years... although we did have a submit wrapper that type-hinted the args.

didibus04:07:14

Ah cool. So ya the completion-service is quite trivial to use then:

(defn completion-io
  [input-coll io-fn n]
  (let [executor (Executors/newFixedThreadPool n)
        completion-service (ExecutorCompletionService. executor)]
    (doseq [e input-coll]
      (.submit completion-service #(io-fn e)))
    completion-service))
Short and sweet.

seancorfield04:07:42

(we actually have a protocol around it so we can compose "named" executors and incorporate logging for background thread but the core is just (.submit exec some-clj-fn))

seancorfield04:07:49

I'm really excited about virtual threads -- now MySQL JDBC 9.0 has dropped -- and we're starting to switch future and various other constructs over to use vthreads at work.

1
seancorfield04:07:18

(MySQL 9.0 rewrites all the synchronized methods to use ReentrantLock to be compatible with vthreads)

1
didibus04:07:05

Ya, the vthread version I think is just:

(defn completion-io
  [input-coll io-fn n]
  (let [executor (Executors/newVirtualThreadPerTaskExecutor)
        completion-service (ExecutorCompletionService. executor)
        sem (Semaphore. n false)]
    (doseq [e input-coll]
      (.submit completion-service
               #(try (.acquire sem)
                     (io-fn e)
                     (finally (.release sem)))))
    completion-service))

seancorfield04:07:24

What does the completion service buy you? (I've never needed it)

didibus04:07:40

It waits for the first to complete. So it let's you handle results as they complete.

didibus04:07:59

It's kind of like alt! in core.async.

seancorfield04:07:06

You still have to deref the result tho', right?

seancorfield04:07:00

You call .take on the service but you still get a Future...

didibus04:07:25

Hum, well it will be a Java future, but you can get from it right away. The future taken is guaranteed to have a value available. It returns a future so you still get it to throw when you call get on it.

seancorfield04:07:40

So the benefit is you get results as they complete (as opposed to the original collection order)

seancorfield04:07:08

.take is blocking -- "waiting if none are yet present."

seancorfield04:07:43

So, no, you cannot "get from it right away". Only as each result actually completes.

didibus04:07:12

Ya, take will block until any of them completes. First one to complete, it unblocks and you can .get the value (or it throws if it contained an exception). You can also use poll which is non-blocking (or give it a timeout so it blocks for N time). It's confusing, but .poll also will remove the element from the queue if sucessful.

didibus04:07:44

I mean after .take returns a future, that future is going to be realized, so the .get call to the future won't block.

seancorfield04:07:13

Sure, but .take itself blocks.

seancorfield04:07:21

You can't get away from blocking here.

didibus04:07:28

Well, you can use .poll

seancorfield04:07:39

And spin on nil...

didibus04:07:50

Ya. There's no "event callback" here.

seancorfield04:07:00

(`.poll` also returns a Future so you still have to deref it)

seancorfield04:07:09

So CompletionService adds a layer of complexity but has the benefit of returning results "as fast as possible", but loses collection order (if that mattered).

👍 1
didibus04:07:14

Ya, .take and .get returning a future is annoying. But I guess it was for error handling, cause in Java nobody expect an exception as a value on the queue. So .get throws if the future contains an exception. I guess they could have made .take throw, not sure what the rational was.

didibus04:07:46

That's how I understood it ya. It's when order doesn't matter, and you want to process results as soon as possible.

didibus04:07:50

If you use a "callback" instead, like have the submitted task call the callback with the IO result, the callback will also run on that thread, so that will delay the next IO from starting. Unless you have your callbacks also sent to another executor, etc. But I think this is where CompletionService let's the result be handled on the thread that submitted, or you can spawn another thread to handle all results as they complete.

didibus05:07:18

That said, I don't think it be that hard, and at least in clojure passing in a callback might be nicer.

(defn completion-io
  [input-coll io-fn callback n]
  (let [io-executor (Executors/newFixedThreadPool n)
        callback-executor (Executors/newCachedThreadPool)]
    (doseq [e input-coll]
      (.submit io-executor
               #(.submit callback-executor
                         (fn []
                           (callback (try
                                       (io-fn e)
                                       (catch Exception ex
                                         ex)))))))))

didibus05:07:27

FYI, I did not test all these examples haha

👍 1
Hendrik05:07:27

maybe https://github.com/leonoel/missionary is a good fit for this usecase :)

igrishaev07:07:20

If your requests are IO-heavy, then virtual threads might be the case. You can try https://github.com/igrishaev/virtuoso which is a thin wrapper on top of virtual threads. there is a pmap! function which acts like pmap but uses a virtual thread pool

Noah Bogart14:07:53

I use this, which blocks until everything is done and returns things in the order provided:

(defn pmap*
  [f coll]
  (let [executor (Executors/newCachedThreadPool)
        futures (mapv #(.submit executor (reify Callable (call [_] (f %)))) coll)
        ret (mapv #(.get ^Future %) futures)]
    (.shutdownNow executor)
    ret))

Noah Bogart14:07:06

switching to Executors/newFixedThreadPool handles the thread spawning number

escherize01:07:15

@UEENNMX0T I landed on something extremely close to your implementation. Here's what I came up with after considering all the excellent comments in this thread: https://github.com/metabase/metabase/pull/45256/files#diff-b46449156335348442234e1aa097fbf4bef36e1edc06643608efa29b208694ddR6 Thank you folks!

👍 1
ghaskins00:07:19

@U051GFP2V just for completeness, perhaps also checkout the bulkhead facility in promesa: https://funcool.github.io/promesa/latest/bulkhead.html

ghaskins00:07:57

Its probably not exactly what you want because of the behavior at queue limit, but may be of interest to see how its written

Nikolas Pafitis20:07:06

Is there any support for Clojure on zed editor? Third party plugins?

chandrasekar20:07:58

syntax highlighting and clojure-lsp works but no repl support/plugin available afaik

1
valerauko06:07:43

there is a clojure extension, not sure what it does though

valerauko06:07:12

on a related note is there a paredit/parinfer extension?

respatialized13:07:29

paredit / parinfer will probably need to be rewritten to use tree-sitter parse trees; I don't know if anyone has undertaken that effort yet