This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
Is it good practice to do pmap
on side effecting function that does requests for example or is there a better way?
It is good practice to do anything but use pmap for side effecting functions. Your ways of constraining parallelism are anything between using an executor service with fixed size thread pool, spawning futures and grabbing a semaphor, using core.async, or the ideas described here https://andersmurphy.com/2024/05/14/clojure-structured-concurrency-and-scoped-values.html
Nope, pmap is for mapping in parallel, when the mapping function is side-effect free.
Ya. It's not terrible. Each future spawn a thread, each thread block on the IO, and you wait until all threads unblock due to IO completion.
The only "danger" of that approach is that you don't have limits on the number of threads. So if your sequence contains a million elements, you might create a million threads.
is there something where i can limit it in a naive approach i know i could do core async but somehow it feels overkill
In practice though, I've found it's a lot more robust then you think. Because you probably won't have a million elements in the list, and the IO of the first future might complete by the time you create the next one, so some of the threads would free up again.
I think another naive way that caps it is to apply some batching over it. So like take n number, start a future for those, wait for them all to complete. Once completed, take the next n and repeat. So you do it in batches.
It's still "naive" because it's slightly suboptimal. Since you don't saturate your limit so that there are always N number active. You do N at a time and do the next N when all the first N completes. But again it's probably good enough. If your N = 100 for example. It depends if your use-case truly needs to go as fast as humanly possible or not.
Also, core.async isn't as overkill as you might think.
(require '[clojure.core.async :as a])
(let [coll (range 10)
output-chan (a/chan (count coll))]
(a/pipeline-blocking 100
output-chan
(map blocking-io)
(a/to-chan! coll))
(a/
This is all the code you need, and now you have an efficient non-naive implementation. Pipeline-blocking will make sure to saturate the concurrency level (of 100) that was set. As soon as something completes it'll start another one so that we are always at 100 concurrency.
It has a limit, so you don't need to worry about spawning too many threads.You can make a little util like this:
(defn await-io
[input-coll io-fn n]
(let [output-chan (a/chan)]
(a/pipeline-blocking n
output-chan
(map io-fn)
(a/to-chan! input-coll))
(a/"
""
""]
slurp
10)
clojure is just awesome one of the best lisps i ever used and one of the nicest communities thank you !!!
btw, another reason to beware of pmap is it's lazy, so you'll have double trouble when mixing it with side effects, I even wrote https://bsless.github.io/side-effects/ about all the ways this could melt your code
If it’s doing IO, and you’re on JDK 21, the by far easiest thing to do would be to spawn virtual threads and constrain parallelism with a semaphore.
@U06B8J0AJ That still fails to saturate the concurrency though.
By the way, the output chan doesn't need to be buffered, I forgot that into consumes one element at a time. So no buffer needed
Why would it fail to saturate concurrency? @U0K064KQV
@U06B8J0AJ Well, it depends on the detail I guess of exactly how you orchestrate it. But to saturate it, you need a mechanism like alt!!
or poll
or you need async completion notifications.
So that you're not blocked on all the spawned thread completing, or on them completing in the submitted order. But instead that you are blocked on the first one to complete, so that as soon as one of them completes, you can queue up another.
In "Java" style, I would use the ExecutorCompletionService
, you can use vthreads
with it. And it lets you poll
or take
which will return the next completed task if any. And at that moment you can submit another, and process the competed one.
pipeline-blocking
does this under the hood for us. I have not yet found something simpler than that. In terms of how short the implementation is.
Oh, or maybe you implied, say you have 100k elements to process at 100 concurrency, that you spawn 100k vthread and have them all block behind the semaphore? I hadn't really thought of that. Are vthread that free that you can just use them as a queue?
@U06B8J0AJ Hum , ok so actually, your idea of the semaphore approach I think might be amazing haha. I guess I never really thought of using semaphore before. I don't even think it matters if you use vthreads or real threads.
(defn await-io4
[input-coll io-fn n]
(let [sem (Semaphore. n true)
res (for [e input-coll]
(do (.acquire sem)
(future (try (io-fn e) (finally (.release sem))))))]
(mapv deref res)))
I'm not too sure how this differs from pipeline-blocking
. You could argue the pipeline-blocking
implementation is still shorter and more straightforward, but this is pretty simple too.This one maintains order as well (I did just edit it, got inspired by the code of pipeline and realized I could maintain order in a more straightforward way)
It’s true that none of this requires virtual threads, but in the case of IO, there’s no reason not to use them. They’re cheap and plentiful.
Btw, using semaphores is not my idea, but the official recommendation from the JVM dev team. You can have millions of virtual threads with no problem, so when there’s no upper bound set by the thread pool, a semaphore is required.
Promesa has a core.async inspired implementation of CSP for channels etc. on top of virtual threads: https://funcool.github.io/promesa/latest/promesa.exec.csp.html. Experimental though.
virtual threads and semaphores sounded familiar: https://andersmurphy.com/2024/05/06/clojure-managing-throughput-with-virtual-threads.html
I'm still not sure if it's ideal to use them as a queue. Which you're doing in your example. Because you spawn one even for waiting tasks, and have them blocked. I tried to search for info and couldn't really see. Probably for small number of tasks it's fine. But it still seems they consume more memory than queuing up tasks would. And I also couldn't tell if it impacts scheduling in any way if you have thousands or more of them.
Quite sure that my usage of routes
to break an API into publicly-accessible and authenticated is incorrect:
(defn common-middleware
[app & {:keys
[top-k
un-index
wiki-index
unsc-index]}]
(-> app
(handle-query-params)
(embed-context {:un-pinecone-conn un-index
:wiki-pinecone-conn wiki-index
:unsc-pinecone-conn unsc-index
:top-k top-k})
(request-context)
(wrap-cors :access-control-allow-origin [#".*"]
:access-control-allow-methods [:get :options :put :post :delete])))
(defroutes public-routes
(GET "/nations" _ (fn [a b c]
(handle-get-nations a b c)))
(POST "/voting-records" _
(fn [a b c]
(handle-voting-records-query a b c))))
(defroutes private-routes
(GET "/test" _ (fn [_ rp _] (rp "Hello world")))
(POST "/conversation" _ (fn [a b c]
(handle-create-conversation a b c)))
(GET "/async" _ (fn [a b c]
(handler-async a b c)))
(GET "/invoke" _ (fn [a b c]
(handle-post-message a b c)))
(POST "/download-link" _ (fn [a b c]
(handle-download-link a b c))))
(defn app
[& config]
(routes (-> private-routes
(authenticate-request)
(common-middleware config))
(common-middleware public-routes config)))
(defn -main
"I don't do a whole lot ... yet."
[& _]
(μ/log ::starting-server)
(System/setProperty "cohere.api.key" (env :cohere-api-key))
(let [top-k (Integer/parseInt (env :top-k))
un-index (un-pinecone-connection)
wiki-index (wiki-pinecone-connection)
unsc-index (unsc-pinecone-connection)]
(run-jetty (app
:top-k top-k
:un-index un-index
:wiki-index wiki-index
:unsc-index unsc-index)
{:port 3000
:async? true
:join? false})))
My goal is to have public-routes be accessible without authenticating, and private-routes require going through the authentication middleware.
In the current setup, hitting public routes requires authentication. If I switch up the order:
(defn app
[& config]
(routes
(common-middleware public-routes config)
(-> private-routes
(authenticate-request)
(common-middleware config))))
then public routes work, but private routes don't make it all the way to their request handlers, just through all the middleware. I am mightily confused about how to make this work and why this is erroring the way that it isSo when the authorized routes are first, they are tied first, and they return a not authorized response, so the later routes that don't require authorization are never attempted
That makes a ton of sense. What doesn't though, is if I reverse the order, I can see via my logging that the requests don't make it to the handlers in most cases. They do for /test
which is a simple return, but don't for /conversation
. Note that I didn't change anything about the handler, and it has a log line on the first sexp just saying that the handler has been hit. I can tell that the authentication step is done for all of them. If I remove the public routes, the private routes all work.
I refactored to this:
(defn app
[& config]
(-> (routes public-routes (authenticate-request private-routes))
(common-middleware config)))
And it just worked