Fork me on GitHub
#beginners
<
2024-05-19
>
m3tti08:05:22

Is it good practice to do pmap on side effecting function that does requests for example or is there a better way?

Ben Sless12:05:54

It is good practice to do anything but use pmap for side effecting functions. Your ways of constraining parallelism are anything between using an executor service with fixed size thread pool, spawning futures and grabbing a semaphor, using core.async, or the ideas described here https://andersmurphy.com/2024/05/14/clojure-structured-concurrency-and-scoped-values.html

3
didibus16:05:27

Nope, pmap is for mapping in parallel, when the mapping function is side-effect free.

m3tti16:05:55

ok i'm currently using future and doseq to wait till everything is defered

m3tti16:05:06

guess that is a quite naive way that should work right

didibus16:05:57

Ya. It's not terrible. Each future spawn a thread, each thread block on the IO, and you wait until all threads unblock due to IO completion.

didibus16:05:56

The only "danger" of that approach is that you don't have limits on the number of threads. So if your sequence contains a million elements, you might create a million threads.

m3tti16:05:32

is there something where i can limit it in a naive approach i know i could do core async but somehow it feels overkill

didibus16:05:18

In practice though, I've found it's a lot more robust then you think. Because you probably won't have a million elements in the list, and the IO of the first future might complete by the time you create the next one, so some of the threads would free up again.

didibus16:05:45

I think another naive way that caps it is to apply some batching over it. So like take n number, start a future for those, wait for them all to complete. Once completed, take the next n and repeat. So you do it in batches.

didibus16:05:45

It's still "naive" because it's slightly suboptimal. Since you don't saturate your limit so that there are always N number active. You do N at a time and do the next N when all the first N completes. But again it's probably good enough. If your N = 100 for example. It depends if your use-case truly needs to go as fast as humanly possible or not.

didibus17:05:24

Also, core.async isn't as overkill as you might think.

(require '[clojure.core.async :as a])

(let [coll (range 10)
      output-chan (a/chan (count coll))]
  (a/pipeline-blocking 100
                       output-chan
                       (map blocking-io)
                       (a/to-chan! coll))
  (a/
This is all the code you need, and now you have an efficient non-naive implementation. Pipeline-blocking will make sure to saturate the concurrency level (of 100) that was set. As soon as something completes it'll start another one so that we are always at 100 concurrency. It has a limit, so you don't need to worry about spawning too many threads.

didibus17:05:08

You can make a little util like this:

(defn await-io
  [input-coll io-fn n]
  (let [output-chan (a/chan)]
    (a/pipeline-blocking n
                         output-chan
                         (map io-fn)
                         (a/to-chan! input-coll))
    (a/"
  ""
  ""]
 slurp
 10)

m3tti17:05:55

this looks cool thank you so much

😊 1
m3tti17:05:24

clojure is just awesome one of the best lisps i ever used and one of the nicest communities thank you !!!

3
Ben Sless17:05:48

btw, another reason to beware of pmap is it's lazy, so you'll have double trouble when mixing it with side effects, I even wrote https://bsless.github.io/side-effects/ about all the ways this could melt your code

henrik19:05:59

If it’s doing IO, and you’re on JDK 21, the by far easiest thing to do would be to spawn virtual threads and constrain parallelism with a semaphore.

didibus19:05:47

@U06B8J0AJ That still fails to saturate the concurrency though.

didibus19:05:48

By the way, the output chan doesn't need to be buffered, I forgot that into consumes one element at a time. So no buffer needed

henrik21:05:40

Why would it fail to saturate concurrency? @U0K064KQV

didibus23:05:18

@U06B8J0AJ Well, it depends on the detail I guess of exactly how you orchestrate it. But to saturate it, you need a mechanism like alt!! or poll or you need async completion notifications. So that you're not blocked on all the spawned thread completing, or on them completing in the submitted order. But instead that you are blocked on the first one to complete, so that as soon as one of them completes, you can queue up another.

didibus23:05:05

In "Java" style, I would use the ExecutorCompletionService, you can use vthreads with it. And it lets you poll or take which will return the next completed task if any. And at that moment you can submit another, and process the competed one.

didibus00:05:20

pipeline-blocking does this under the hood for us. I have not yet found something simpler than that. In terms of how short the implementation is.

didibus02:05:33

Oh, or maybe you implied, say you have 100k elements to process at 100 concurrency, that you spawn 100k vthread and have them all block behind the semaphore? I hadn't really thought of that. Are vthread that free that you can just use them as a queue?

didibus04:05:10

@U06B8J0AJ Hum think_beret , ok so actually, your idea of the semaphore approach I think might be amazing haha. I guess I never really thought of using semaphore before. I don't even think it matters if you use vthreads or real threads.

(defn await-io4
  [input-coll io-fn n]
  (let [sem (Semaphore. n true)
        res (for [e input-coll]
              (do (.acquire sem)
                  (future (try (io-fn e) (finally (.release sem))))))]
    (mapv deref res)))
I'm not too sure how this differs from pipeline-blocking. You could argue the pipeline-blocking implementation is still shorter and more straightforward, but this is pretty simple too.

Ben Sless04:05:54

Pipeline maintains order and requires using channels

didibus04:05:21

This one maintains order as well (I did just edit it, got inspired by the code of pipeline and realized I could maintain order in a more straightforward way)

henrik06:05:52

Some helpers to give virtual threads an interface similar to futures.

henrik06:05:02

It’s true that none of this requires virtual threads, but in the case of IO, there’s no reason not to use them. They’re cheap and plentiful.

henrik06:05:54

Btw, using semaphores is not my idea, but the official recommendation from the JVM dev team. You can have millions of virtual threads with no problem, so when there’s no upper bound set by the thread pool, a semaphore is required.

henrik06:05:48

Promesa has a core.async inspired implementation of CSP for channels etc. on top of virtual threads: https://funcool.github.io/promesa/latest/promesa.exec.csp.html. Experimental though.

m3tti06:05:52

Not quite sure if i have virtual threads on babashka. Maybe i've read something wrong

Ben Sless13:05:18

That's the one I should have linked 😛

henrik13:05:17

Btw, note cloneThreadBindingFrame . There might be surprises if it’s missing.

didibus21:05:11

I'm still not sure if it's ideal to use them as a queue. Which you're doing in your example. Because you spawn one even for waiting tasks, and have them blocked. I tried to search for info and couldn't really see. Probably for small number of tasks it's fine. But it still seems they consume more memory than queuing up tasks would. And I also couldn't tell if it impacts scheduling in any way if you have thousands or more of them.

Nim Sadeh21:05:11

Quite sure that my usage of routes to break an API into publicly-accessible and authenticated is incorrect:

(defn common-middleware
  [app & {:keys
          [top-k
           un-index
           wiki-index
           unsc-index]}]
  (-> app
      (handle-query-params)
      (embed-context {:un-pinecone-conn un-index
                      :wiki-pinecone-conn wiki-index
                      :unsc-pinecone-conn unsc-index
                      :top-k  top-k})
      (request-context)
      (wrap-cors :access-control-allow-origin [#".*"]
                 :access-control-allow-methods [:get :options :put :post :delete])))

(defroutes public-routes
  (GET "/nations" _ (fn [a b c]
                      (handle-get-nations a b c)))
  (POST "/voting-records" _
    (fn [a b c]
      (handle-voting-records-query a b c))))

(defroutes private-routes
  (GET "/test" _ (fn [_ rp _] (rp "Hello world")))
  (POST "/conversation" _ (fn [a b c]
                            (handle-create-conversation a b c)))
  (GET "/async" _ (fn [a b c]
                    (handler-async a b c)))
  (GET "/invoke" _ (fn [a b c]
                     (handle-post-message a b c)))
  (POST "/download-link" _ (fn [a b c]
                             (handle-download-link a b c))))

(defn app
  [& config]
  (routes (->  private-routes
               (authenticate-request)
               (common-middleware config))
          (common-middleware public-routes config)))

(defn -main
  "I don't do a whole lot ... yet."
  [& _]
  (μ/log ::starting-server)
  (System/setProperty "cohere.api.key" (env :cohere-api-key))
  (let [top-k (Integer/parseInt  (env :top-k))
        un-index (un-pinecone-connection)
        wiki-index (wiki-pinecone-connection)
        unsc-index (unsc-pinecone-connection)]
    (run-jetty (app
                :top-k top-k
                :un-index un-index
                :wiki-index wiki-index
                :unsc-index unsc-index)
               {:port 3000
                :async? true
                :join? false})))
My goal is to have public-routes be accessible without authenticating, and private-routes require going through the authentication middleware. In the current setup, hitting public routes requires authentication. If I switch up the order:
(defn app
  [& config]
  (routes
   (common-middleware public-routes config)
   (->  private-routes
        (authenticate-request)
        (common-middleware config))))
then public routes work, but private routes don't make it all the way to their request handlers, just through all the middleware. I am mightily confused about how to make this work and why this is erroring the way that it is

hiredman21:05:17

The issue is likely how routes compose in compojure

hiredman21:05:53

Which is to try one after another until you get a response

hiredman21:05:00

So when the authorized routes are first, they are tied first, and they return a not authorized response, so the later routes that don't require authorization are never attempted

Nim Sadeh21:05:05

That makes a ton of sense. What doesn't though, is if I reverse the order, I can see via my logging that the requests don't make it to the handlers in most cases. They do for /test which is a simple return, but don't for /conversation. Note that I didn't change anything about the handler, and it has a log line on the first sexp just saying that the handler has been hit. I can tell that the authentication step is done for all of them. If I remove the public routes, the private routes all work.

hiredman21:05:24

I would double check your logging, and what routes are making requests to

Nim Sadeh21:05:28

I refactored to this:

(defn app
  [& config]
  (-> (routes public-routes (authenticate-request private-routes))
      (common-middleware config)))
And it just worked

hiredman21:05:16

Depending on what your middleware is doing, if it is internally stateful (the session middleware is the most common mw that is) two calls would result in an entirely separate state each, which can break things in odd ways

Nim Sadeh21:05:51

Nothing is stateful, I am not sophisticated enough to use sessions. I have the following mw: 1. parse url query params and wrap in the body 2. embed context/global variables 3. log out the request 4. handle cors