Fork me on GitHub

Anyone knows how to avoid calls to Numbers.num ?


I believe if you are doing unchecked math, and you calls are all inline properly you will not get any calls to num


There is a var you can set to get warnings about uncheck math


This is the code:

(defn smt-8' [times-vec]
  (binding [*unchecked-math* true]
    (loop [res (transient []) pointer-1 (int 0) pointer-2 (int 7)]
      (if-let [end-element (get times-vec pointer-2)]
        (let [start-element (get times-vec pointer-1)
              time-diff (- end-element start-element)]
           (if (< time-diff 1000)
             (conj! res [(subvec times-vec pointer-1 (inc pointer-2))
           (inc pointer-1)
           (inc pointer-2)))
        (persistent! res)))))


If the unchecked math operation inlines itself, it becomes just a call to a static method, and the compiler, of the types are right, will recognize the method and replace it with just a jvm instruction


You can't use binding like that


Are you sure? It does half the time when I set the binding like that 😛


You're are setting the var as part of the execution of the code from the same compilation unit you are trying to effect


E.g. that binding doesn't happen until the code has been compiled and executed, and you are trying to use it to effect compilation


I see, you mean it applies at compilation, so I need to wrap the defn in the binding?


Its weird that it showed a performance gain


Even wrapping the defn in a binding won't work


You just need to set! the var at the top level, so that is compiled and executed before the form you care about is compiled


Anything you wrap in binding, binding will be part of the same compilation unit, so cannot effect the compilation of the wrapped thing


Hum, interesting, I didn't know that


It's kind of annoying, unchecked-math applied globally changes semantics


The only form that doesn't group thinks together into a compilation unit is do, and binding expands into a try/finally if I recall


I see, I'll try:

  (set! *unchecked-math* true)
  (defn smt-8' [times-vec]
    (loop [res (transient []) pointer-1 (int 0) pointer-2 (int 7)]
      (if-let [end-element (get times-vec pointer-2)]
        (let [start-element (get times-vec pointer-1)
              time-diff (- end-element start-element)]
           (if (< time-diff 1000)
             (conj! res [(subvec times-vec pointer-1 (inc pointer-2))
           (inc pointer-1)
           (inc pointer-2)))
        (persistent! res))))
  (set! *unchecked-math* false))


You should use the unchecked versions of the math functions


unchecked-int-inc (or whatever)


Ya, but its more annoying when you've already written the code 😛


Especially when benchmarking and going back n forth


Ok, so :warn-on-boxed does show that time-diff (- end-element start-element) and (< time-diff 1000) are boxing. But how do I fix it?


It doesn't know the type of start or end element, so of course it doesn't know the type of the diff


Ah ya, makes sense. Does (vector-of) can infer that? Or still not?


Ok cool, and it won't really do anything except take less memory right? Cause when you get from a vector-of it still boxes?


Maybe I'm reading this wrong: > While bound to true, compilations of +, -, *, inc, dec and the coercions will be done without overflow checks. While bound to :warn-on-boxed, same behavior as true, and a warning is emitted when compilation uses boxed math. Default: false. This says that :warn-on-box also changes all default _,-,*,inc,dec etc to their unchecked variant? So it doesn't only warn


So it seems that with array Clojure get infer the type?


Like on aget


I'm also still getting calls to Number.num hum... albeit less of them


Oh, it might be when putting the numbers into the return vector.


I guess no way to avoid that


Alright, thanks @hiredman that was some good learnings for me

thom07:10:38 will give you a more reliable way of using the primitive ops, in my experience. Always worth leaving warn-on-boxed on though.


Hey, thanks for the info. Sometimes you don't want unchecked math though, so you know if anything has overflowed or not, like Y2K kind of bug.


About that library, is there something it does to enable more primitive math, or it just warns you better when it can't be used?


Hey, I’m wondering what the best way would be to map over a hash map of vectors combining the vector values together, ideally only adding distinct vectors i.e. if I took:

(def sched1 {"Sunday" [["00:00" "24:00"]], "Monday" [["00:00" "24:00"]], "Tuesday" [["00:00" "24:00"]], "Wednesday" [["00:00" "24:00"]], "Thursday" [["00:00" "24:00"]], "Friday" [["00:00" "24:00"]], "Saturday" [["00:00" "24:00"]]})
(def sched2 {"Sunday" [["00:00" "10:00"]], "Monday" [["00:00" "10:00"]]})
they would cobine to:
{"Sunday" [["00:00" "24:00"]["00:00" "10:00"]], "Monday" [["00:00" "24:00"]["00:00" "10:00"]], "Tuesday" [["00:00" "24:00"]], "Wednesday" [["00:00" "24:00"]], "Thursday" [["00:00" "24:00"]], "Friday" [["00:00" "24:00"]], "Saturday" [["00:00" "24:00"]]}

Linus Ericsson07:10:00

(merge-with (comp vec concat) map-1 map-2 ....)


oh sweet thanks yeah that works great --one thing I notice where it is actually concatting it isreturning maps i.e.

{"Sunday" (["00:00" "24:00"] ["00:00" "10:00"]),
 "Monday" (["00:00" "24:00"] ["00:00" "10:00"]),
 "Tuesday" [["00:00" "24:00"]],
 "Wednesday" [["00:00" "24:00"]],
 "Thursday" [["00:00" "24:00"]],
 "Friday" [["00:00" "24:00"]],
 "Saturday" [["00:00" "24:00"]]}


oh, yeah looks like your update probably fixes that

Linus Ericsson07:10:58

yes, (comp vec concat) returns vectors as you needed.


awesome, thanks so much

Ben Sless15:10:50

You can use into instead of concat, too


another similar problem, coming from a list of maps, how to combine them into one map merging their keys in the same fashion i.e.:

({"Sunday" [["00:00" "10:00"]]}
 {"Sunday" [["12:00" "16:00"]]}
 {"Monday" [["00:00" "10:00"]]}
 {"Tuesday" [["00:00" "10:00"]]}
 {"Wednesday" [["00:00" "10:00"]]}
 {"Thursday" [["00:00" "10:00"]]}
 {"Friday" [["00:00" "10:00"]]}
 {"Saturday" [["00:00" "10:00"]]})
is translated to:


({"Sunday" [["00:00" "10:00"]["12:00" "16:00"]]
  "Monday" [["00:00" "10:00"]]
  "Tuesday" [["00:00" "10:00"]]
  "Wednesday" [["00:00" "10:00"]]
  "Thursday" [["00:00" "10:00"]]
  "Friday" [["00:00" "10:00"]]
  "Saturday" [["00:00" "10:00"]]})


Same reply as in the previous thread, only add apply in front of merge-with. Also, you can replace (comp vec concat) with just into.


user=> (pprint d)
({"Sunday" [["00:00" "10:00"]]}
 {"Sunday" [["12:00" "16:00"]]}
 {"Monday" [["00:00" "10:00"]]}
 {"Tuesday" [["00:00" "10:00"]]}
 {"Wednesday" [["00:00" "10:00"]]}
 {"Thursday" [["00:00" "10:00"]]}
 {"Friday" [["00:00" "10:00"]]}
 {"Saturday" [["00:00" "10:00"]]})
user=> (pprint (apply merge-with into d))
{"Sunday" [["00:00" "10:00"] ["12:00" "16:00"]],
 "Monday" [["00:00" "10:00"]],
 "Tuesday" [["00:00" "10:00"]],
 "Wednesday" [["00:00" "10:00"]],
 "Thursday" [["00:00" "10:00"]],
 "Friday" [["00:00" "10:00"]],
 "Saturday" [["00:00" "10:00"]]}

👍 1

And you can deal with the duplicates by using sets instead of vectors. But if you care about the order of the sub-vectors, then it gets a bit more complex.


yeah works perfectly, thanks!


I had written a function to dedupe after the fact:

(defn f1 [m]
  (reduce (fn [res [k v]]
            (assoc res k (vec (distinct v))))
          {} m))


i’m sure there is a more eloquent way to do it but that seems to work


That one is alright. Two things I'd change: • Replace reduce + [k v] with reduce-kv + k v • If performance is important, consider replacing (vec (distinct v)) with (into [] (distinct) v) but of course measure first


Deduping can also be done with sets (`#{}`, although the intent might be less clear with that 🙂


Less clear intent, almost certainly worse performance, and the loss of the original order. :)


reduce-kv is just more performant?


Yes, and the intent is more clear as well.


👍 thanks!


if you wanted to include the dedupe part in the same step, you could always use the distinct transducer?

(let [d [{"Sunday" [["00:00" "10:00"]]}
           {"Sunday" [["12:00" "16:00"]]}
           {"Monday" [["00:00" "10:00"]]}
           {"Tuesday" [["00:00" "10:00"]]}
           {"Wednesday" [["00:00" "10:00"]]}
           {"Thursday" [["00:00" "10:00"]]}
           {"Friday" [["00:00" "10:00"]]}
           {"Saturday" [["00:00" "10:00"]]}]]
    (transduce (comp cat
                     (mapcat #(map vector (repeat (key %1)) (val %1)))
               (completing #(update %1 (first %2) (fnil conj []) (second %2)))


or using the xfroms lib:

(let [d [{"Sunday" [["00:00" "10:00"]]}
           {"Sunday" [["12:00" "16:00"]]}
           {"Monday" [["00:00" "10:00"]]}
           {"Tuesday" [["00:00" "10:00"]]}
           {"Wednesday" [["00:00" "10:00"]]}
           {"Thursday" [["00:00" "10:00"]]}
           {"Friday" [["00:00" "10:00"]]}
           {"Saturday" [["00:00" "10:00"]]}]]
    (into {}
          (comp cat
                (x/by-key first
                          (comp (mapcat second)
                                (x/into []))))


I'd argue it's significantly more opaque. But if someone knows xforms already then it might be fine.


Yeah ... that's fair enough ... I'm liking the transducer forms more these days ... but yup ... I can accept this is probably less clear 😉


What would be a good approach to queue HTTP requests in Clojure? Basically I'm hitting a certain HTTP API endpoint and I only want to make 6 concurrent requests at any point. If someone could suggest a good high level approach/library that helps with it, it'd be very much appreciated! On the top of my head, I'm thinking of having 1 atom counter that maintains the current number of parallel requests & spawning off up to 6 futures at a time - incrementing counter on each occasion (provided counter is 6 or less). When each future finishes execution, it decrements the counter. Do you see a flaw with this approach, is there something better I could do? Thanks!


Wouldn't simply creating a fixed thread pool work here?

👍 1

@U2FRKM4TW I will have a look, thanks. I'm still new to concurrency approaches in Clojure/Java land.


@U20DT8LSE The “modern” java solution is to just use a semaphore

👍 1

which is exactly the strategy you are describing


How is it better than a fixed pool?


It would work with virtual threads, hypothetically, and it doesn’t require that you need dedicated threads for the api calls - you could do it in say your 50 or so jetty threads


(def *stroopwaffle-api-semaphore* (Semaphore. 6))

(defn make-api-call 
  (try (.acquire *stroopwaffle-api-semaphore*)
       (http/get ...)
       (finally (.release *stroopwaffle-api-semaphore*))))


So it's better if and only if you're using some kind of green threads. But with regular threads it's actually worse. Right?


I know we have to do a more complicated strategy to make sure we stay below facebook’s rate limits, so this isn’t the solution


well, its disconnected from the number of threads


if you have 8 threads and a semaphore of size 8 its definitely redundant


i’m not sure how it would be worse though


It's worse in terms of the amount of code you have to write and care you have to put in. It's easy to imagine someone using that code to rate limit something while submitting all the tasks to an unbounded thread pool. And now instead of 6 threads you have one semaphore and N threads with N having no upper bound. Whereas a fixed thread pool is literally just one line of code, and it's doing everything for you.


it all depends on the guarantees you want and what your app is doing. If you can get away with just using a threadpool its good, but if you want to make sure api calls in http requests respect the heuristic wrapping the namespace in “max 6” where the calls are made is more resilient


like you can imagine starting with a fixed thread pool of 6 and you do a task that talks to this api and does some queries on your db


but the queries take longer than the api call, so you have a pressure to increase the size of the thread pool to do it faster but that hits your api rate limit


Oh, no - querying a DB is an orthogonal concern, let's not mix them together.


Just like you didn't put (query-the-db) under that semaphore, I wouldn't schedule it on the same thread pool.


okay so in that usage pattern the fixed thread pool is definitely worse


since instead of an atomic counter that goes up to 6 and threads wait on it, other threads will have a threadpool of 6 threads they hand data off to, wait on, and then get data back


part of the reason a threadpool can be trouble is that people naturally submit their whole task to it. The distance between the implicit assumption that you will only call some api with some max concurrency and the part of the code where you actually call that api can be large


Assuming I understand you correctly, you're inventing a situation OP hasn't described at all, and neither have I. I'm still operating in the "need to do X requests, with up to Y requests in flight" realm. Anything else depends on everything else. Without further description of the problem I would only add "check out core.async and maybe ExecutorCompletionService".


> you’re inventing a situation OP hasn’t described at all, and neither have I kinda, but thats because the fixed thread pool and the semaphore both work. I’m trying to justify when the extra (with-acquire semaphore …) lines of code might be more appropriate and that relies on context outside of the problem description

👍 2

My specific use case is to handle push notifications in a library that I'm writing. The consumer of the library would call a method in my library (say send-push-notification-async with a token that specifies the device that must get the push notification) and my library would hit a server that'd send the push notification to the specified device. The library would also allow the consumer to send notifications to several devices, if the method above is passed, say an array of device tokens (could be thousands at a time). The only caveat is that the push notification server/service requires that I send call it with only upto 6 requests at max concurrently. So I'd have a queue with all the requests and use this thread pool/semaphore to batch requests to this external service. Hope this gives more context.


in that case • do you want to impose the stateful resource of a thread onto your library users • is max N concurrency on a single machine enough if your users want to use it from the standard 3 api servers?


(see every question about the agent threadpool for how that can be a bother)


my instinct is that api limiting probably shouldn’t be your library’s concern


The library would be stateless as far as the end user is concerned. And yes, the assumption is that the library would be invoked from a single machine (in case of a distributed system, the consumer of the library would have to handle the distributed part).


The library will have to handle rate limiting (that's a requirement), although the size of the threadpool can be passed in as an option by the user.


• The library would be stateless as far as the end user is concerned. • the size of the threadpool can be passed in as an option by the user. pick one


I was a bit unclear on your question, yes the library will take in an options object that allows the user to specify the size of max_concurrent_requests (but it's very unlikely they'd change the default value of 6).


(def p (push-lib/init {:max-concurrent-requests 6}))
(p/send-push-notification-async [<array-of-notification-objects>])
^ it'd be something like this. But the :max-concurrent-requests param is optional.


What's people's opinions on pros/cons of using core.async/pipeline for this sort of thing? It has settable parallelism and uses a cached thread pool.


It's great when it's appropriate. Which is not always the case given this: > Outputs will be returned in order relative to the inputs. I'd also argue that if you don't need the rest of core.async then you should use something else.

👍 1

Also clj-http allows using a connection manager with a fixed number of threads, reducing the burden of doing this by hand.


@U2FRKM4TW in your opinion is having a fixed thread pool a suitable solution here?


The problem description is rather superficial so hard to say for certain. But assuming you don't plan to do much with the results of the requests, then yes, a fixed thread pool is suitable and is the simplest solution. But given what Fredrik said about clj-http, it might be even simpler assuming you're already using that library.

👍 1

@U2FRKM4TW Thanks, I think the return values are indeed required for further processing, so perhaps I'll have to look into alternatives to a thread pool.


I mean honestly you might be able to get pretty far either with clj-http’s built-in connection pooling or even just a mix of partition and future

bubblebobble 1

To me semaphores seem like overkill if all you want to do is only have n requests in flight at a time


The hard part of your problem is not the limit, its the strategy when someone asks to send one more message, but you've hit the max of 6 already


Unless clj-http already has something similar


You'd do something like:

(def http-permits (Semaphore. 6 true))

(defn send-msg []
    (.acquire http-permits)
    (http/post ...)
    (finally (.release http-permits))))


So assuming 6 threads called send-msg, they'd all be allowed to call http/post, but assuming those 6 are still going, the 7th thread that calls send-msg will block, if an 8th thread calls send-msg as well it too will block and it will be queued up after the thread 7. Once one of the first 6 threads are done their finally block will release a permit, and thread 7 will be unblocked and allowed to proceed, etc.


The Semaphore manages both the count of maximum concurrent calls, but it also manages the blocking/unblocking and queuing up of blocked threads and the order of unblocking them all for you.


But that's one strategy. If you don't want to block the caller thread, then you'll want some kind of Queue instead. I think core.async would be good for that. You can spawn one process that grabs from a channel and sends the http requests. Have the channel buffer be 6. Now have another channel with a much bigger buffer say of 1000 (if you want to allow up to 1000 queued messages). Make that a dropping buffer, or a blocking, or a throwing, you can pick, in the case of reaching that limit as well. Finally have another process that takes from the channel of 1000 and puts to the channel of 6.


Anyways, my point was, I think the hardest part is what to do once the max is reached.


@U01EB0V3H39 The partition (actually partition-all) approach actually looks like the best solution for my particular use case. Although, I imagine pmap would be ideal over a future to fire off requests in parallel. This way, I'd also be able to get the HTTP response objects for each request, in order.


@U0K064KQV my problem is actually simpler, since I do not have a stream of requests to dispatch off to the thread pool. Instead I start with a vector of requests that I have to parallelize in chunks, so based on what Max suggested I came up with a solution like this:

(->> [1 2 3 4 5 6 7 8]
     (partition-all 4)
     (map (fn [chunk]
            (pmap (fn [page]
                    (:body (client/get (str "" page)))) chunk)))
^ this code requests 8 pages, in 2 chunks of 4 parallel requests. Another upside is that I end up with a return value of the list of 8 response maps corresponding to each request.


I was initially concerned that with straight up pmap you wouldn’t be able to control the thread pool size, but it appears that it uses a bounded pool for cpu-bound work and am unbounded one for io-bound work, so it should just “do the right thing”


And if that doesn’t work, it sounds like replacing pmap with map future followed by map deref might


pmap is controlled by [n (+ 2 (.. Runtime getRuntime availableProcessors)) so in theory if someone ran this on a single core machine, you would not see parallelization anymore I believe.


Another thing is that laziness and side-effects are tricky to get right. I don't know if you expect the caller to be able to do a (take 4) and only have 4 http get request made? But I think it won't be the case because of chunked-seq


I think you can address both by going eager and using future like so:

(->> [1 2 3 4 5 6 7 8]
     (partition-all 4)
     (mapv (fn [chunk]
            (->> chunk
                    (mapv (fn [page] (future (:body (client/get (str "" page)))))
                    (mapv deref))))


Just now this function is blocking, and you said your API is called send-push-notification-async which to me I would expect that to be non-blocking. But you can just wrap the whole thing in another future for that. It's a bit strange, so its like a batching async API? Might want to call it: send-push-notification-batch-async. Since you give it a batch of messages as a vector and it then sends those out async.


Another issue with this by the way, is you don't control the number of requests made globally in the application. For example:

(defn send-push-notification-batch-async
    (->> batch
         (partition-all 4)
         (mapv (fn [chunk]
                (->> chunk
                        (mapv (fn [page] (future (:body (client/get (str "" page)))))
                        (mapv deref))))

(let [first-batch (send-push-notification-batch-async [1 2 3 4 5 6 7 8])
     second-batch (send-push-notification-batch-async [1 2 3 4 5 6 7 8])
     third-batch (send-push-notification-batch-async [1 2 3 4 5 6 7 8])]
    (mapv deref [first-batch second-batch third-batch]))
You see, now you have 12 concurrent http gets being made, which if the endpoint had a limit I assume would error.


I know this is slightly thread necromancy, but thinking about this a little more recently, I realized some small quirks in the partition approach, and also in clojure.core/pmap. Let’s take an example of a fn to use with future/`pmap`/whatever:

(defn work [x] (Time/sleep x))
And let’s say we want a maximum of 4 works running at a time. If we do the partition approach on the following input:
[4 1 1 1 1]
for the 1st second, all 4 threads will be in use. But in the 2nd second, 3 threads will be sitting idle while we wait for item 0 to complete. clojure/core/pmap actually has the same problem:
(add-tap prn)
(->> (.. Runtime getRuntime availableProcessors)
     (+ 10)
     (#(repeat % 1))
     (cons 5)
     (map vector (range))
     (pmap #(do (Thread/sleep (* 1000 (second %)))
                (tap> (first %))
If you run this, you’ll see it wait 1 second, then print 1-14, then wait 4 seconds, then print 0, then wait 1 second, then print 15-22. has a drop-in replacement for pmap that acts as you’d expect:
(require '[com.climate.claypoole :as cp])
(->> (.. Runtime getRuntime availableProcessors)
     (+ 10)
     (#(repeat % 1))
     (cons 5)
     (map vector (range))
     (cp/pmap (+ 2 (cp/ncpus))
              #(do (Thread/sleep (* 1000 (second %)))
                   (tap> (first %))
The above waits 1 second, prints 1-13, waits 1 more second, prints 14-22, then finally prints 0. So it minimizes thread idle, time, no thread is ever not doing anything.


@thom704 Re;cid=C03S1KBA2: Sounds like a job for a message queue. That way you can decouple the messaging components from one another.