This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2021-10-25
Channels
- # announcements (14)
- # aws (1)
- # babashka (23)
- # beginners (442)
- # calva (50)
- # chlorine-clover (1)
- # cider (32)
- # clojure (124)
- # clojure-europe (35)
- # clojure-france (5)
- # clojure-gamedev (5)
- # clojure-nl (2)
- # clojure-portugal (3)
- # clojure-uk (4)
- # clojurescript (56)
- # conjure (5)
- # cursive (24)
- # datalevin (1)
- # datomic (57)
- # fulcro (35)
- # helix (15)
- # holy-lambda (8)
- # introduce-yourself (1)
- # jobs (5)
- # kaocha (1)
- # lsp (99)
- # malli (10)
- # music (1)
- # off-topic (22)
- # pathom (38)
- # podcasts-discuss (10)
- # polylith (10)
- # reitit (1)
- # releases (1)
- # remote-jobs (4)
- # shadow-cljs (18)
- # spacemacs (6)
- # tools-build (22)
- # vim (66)
- # xtdb (22)
I believe if you are doing unchecked math, and you calls are all inline properly you will not get any calls to num
This is the code:
(defn smt-8' [times-vec]
(binding [*unchecked-math* true]
(loop [res (transient []) pointer-1 (int 0) pointer-2 (int 7)]
(if-let [end-element (get times-vec pointer-2)]
(let [start-element (get times-vec pointer-1)
time-diff (- end-element start-element)]
(recur
(if (< time-diff 1000)
(conj! res [(subvec times-vec pointer-1 (inc pointer-2))
time-diff])
res)
(inc pointer-1)
(inc pointer-2)))
(persistent! res)))))
If the unchecked math operation inlines itself, it becomes just a call to a static method, and the compiler, of the types are right, will recognize the method and replace it with just a jvm instruction
You're are setting the var as part of the execution of the code from the same compilation unit you are trying to effect
E.g. that binding doesn't happen until the code has been compiled and executed, and you are trying to use it to effect compilation
I see, you mean it applies at compilation, so I need to wrap the defn in the binding?
You just need to set! the var at the top level, so that is compiled and executed before the form you care about is compiled
Anything you wrap in binding, binding will be part of the same compilation unit, so cannot effect the compilation of the wrapped thing
The only form that doesn't group thinks together into a compilation unit is do, and binding expands into a try/finally if I recall
I see, I'll try:
(do
(set! *unchecked-math* true)
(defn smt-8' [times-vec]
(loop [res (transient []) pointer-1 (int 0) pointer-2 (int 7)]
(if-let [end-element (get times-vec pointer-2)]
(let [start-element (get times-vec pointer-1)
time-diff (- end-element start-element)]
(recur
(if (< time-diff 1000)
(conj! res [(subvec times-vec pointer-1 (inc pointer-2))
time-diff])
res)
(inc pointer-1)
(inc pointer-2)))
(persistent! res))))
(set! *unchecked-math* false))
Ok, so :warn-on-boxed does show that time-diff (- end-element start-element)
and (< time-diff 1000)
are boxing. But how do I fix it?
It doesn't know the type of start or end element, so of course it doesn't know the type of the diff
Ok cool, and it won't really do anything except take less memory right? Cause when you get from a vector-of it still boxes?
Maybe I'm reading this wrong: > While bound to true, compilations of +, -, *, inc, dec and the coercions will be done without overflow checks. While bound to :warn-on-boxed, same behavior as true, and a warning is emitted when compilation uses boxed math. Default: false. This says that :warn-on-box also changes all default _,-,*,inc,dec etc to their unchecked variant? So it doesn't only warn
https://github.com/ztellman/primitive-math will give you a more reliable way of using the primitive ops, in my experience. Always worth leaving warn-on-boxed on though.
Hey, thanks for the info. Sometimes you don't want unchecked math though, so you know if anything has overflowed or not, like Y2K kind of bug.
About that library, is there something it does to enable more primitive math, or it just warns you better when it can't be used?
Hey, Iâm wondering what the best way would be to map over a hash map of vectors combining the vector values together, ideally only adding distinct vectors i.e. if I took:
(def sched1 {"Sunday" [["00:00" "24:00"]], "Monday" [["00:00" "24:00"]], "Tuesday" [["00:00" "24:00"]], "Wednesday" [["00:00" "24:00"]], "Thursday" [["00:00" "24:00"]], "Friday" [["00:00" "24:00"]], "Saturday" [["00:00" "24:00"]]})
(def sched2 {"Sunday" [["00:00" "10:00"]], "Monday" [["00:00" "10:00"]]})
they would cobine to:
{"Sunday" [["00:00" "24:00"]["00:00" "10:00"]], "Monday" [["00:00" "24:00"]["00:00" "10:00"]], "Tuesday" [["00:00" "24:00"]], "Wednesday" [["00:00" "24:00"]], "Thursday" [["00:00" "24:00"]], "Friday" [["00:00" "24:00"]], "Saturday" [["00:00" "24:00"]]}
(merge-with (comp vec concat) map-1 map-2 ....)
oh sweet thanks yeah that works great --one thing I notice where it is actually concatting it isreturning maps i.e.
{"Sunday" (["00:00" "24:00"] ["00:00" "10:00"]),
"Monday" (["00:00" "24:00"] ["00:00" "10:00"]),
"Tuesday" [["00:00" "24:00"]],
"Wednesday" [["00:00" "24:00"]],
"Thursday" [["00:00" "24:00"]],
"Friday" [["00:00" "24:00"]],
"Saturday" [["00:00" "24:00"]]}
yes, (comp vec concat) returns vectors as you needed.
another similar problem, coming from a list of maps, how to combine them into one map merging their keys in the same fashion i.e.:
({"Sunday" [["00:00" "10:00"]]}
{"Sunday" [["12:00" "16:00"]]}
{"Monday" [["00:00" "10:00"]]}
{"Tuesday" [["00:00" "10:00"]]}
{"Wednesday" [["00:00" "10:00"]]}
{"Thursday" [["00:00" "10:00"]]}
{"Friday" [["00:00" "10:00"]]}
{"Saturday" [["00:00" "10:00"]]})
is translated to:({"Sunday" [["00:00" "10:00"]["12:00" "16:00"]]
"Monday" [["00:00" "10:00"]]
"Tuesday" [["00:00" "10:00"]]
"Wednesday" [["00:00" "10:00"]]
"Thursday" [["00:00" "10:00"]]
"Friday" [["00:00" "10:00"]]
"Saturday" [["00:00" "10:00"]]})
Same reply as in the previous thread, only add apply
in front of merge-with
.
Also, you can replace (comp vec concat)
with just into
.
user=> (pprint d)
({"Sunday" [["00:00" "10:00"]]}
{"Sunday" [["12:00" "16:00"]]}
{"Monday" [["00:00" "10:00"]]}
{"Tuesday" [["00:00" "10:00"]]}
{"Wednesday" [["00:00" "10:00"]]}
{"Thursday" [["00:00" "10:00"]]}
{"Friday" [["00:00" "10:00"]]}
{"Saturday" [["00:00" "10:00"]]})
nil
user=> (pprint (apply merge-with into d))
{"Sunday" [["00:00" "10:00"] ["12:00" "16:00"]],
"Monday" [["00:00" "10:00"]],
"Tuesday" [["00:00" "10:00"]],
"Wednesday" [["00:00" "10:00"]],
"Thursday" [["00:00" "10:00"]],
"Friday" [["00:00" "10:00"]],
"Saturday" [["00:00" "10:00"]]}
nil
And you can deal with the duplicates by using sets instead of vectors. But if you care about the order of the sub-vectors, then it gets a bit more complex.
I had written a function to dedupe after the fact:
(defn f1 [m]
(reduce (fn [res [k v]]
(assoc res k (vec (distinct v))))
{} m))
That one is alright. Two things I'd change:
⢠Replace reduce
+ [k v]
with reduce-kv
+ k v
⢠If performance is important, consider replacing (vec (distinct v))
with (into [] (distinct) v)
but of course measure first
Deduping can also be done with sets (`#{}`, https://clojuredocs.org/clojure.core/set) although the intent might be less clear with that đ
Less clear intent, almost certainly worse performance, and the loss of the original order. :)
if you wanted to include the dedupe part in the same step, you could always use the distinct
transducer?
(let [d [{"Sunday" [["00:00" "10:00"]]}
{"Sunday" [["12:00" "16:00"]]}
{"Monday" [["00:00" "10:00"]]}
{"Tuesday" [["00:00" "10:00"]]}
{"Wednesday" [["00:00" "10:00"]]}
{"Thursday" [["00:00" "10:00"]]}
{"Friday" [["00:00" "10:00"]]}
{"Saturday" [["00:00" "10:00"]]}]]
(transduce (comp cat
(mapcat #(map vector (repeat (key %1)) (val %1)))
(distinct))
(completing #(update %1 (first %2) (fnil conj []) (second %2)))
{}
d))
or using the xfroms lib:
(let [d [{"Sunday" [["00:00" "10:00"]]}
{"Sunday" [["12:00" "16:00"]]}
{"Monday" [["00:00" "10:00"]]}
{"Tuesday" [["00:00" "10:00"]]}
{"Wednesday" [["00:00" "10:00"]]}
{"Thursday" [["00:00" "10:00"]]}
{"Friday" [["00:00" "10:00"]]}
{"Saturday" [["00:00" "10:00"]]}]]
(into {}
(comp cat
(x/by-key first
(comp (mapcat second)
(distinct)
(x/into []))))
d))
??I'd argue it's significantly more opaque. But if someone knows xforms already then it might be fine.
Yeah ... that's fair enough ... I'm liking the transducer forms more these days ... but yup ... I can accept this is probably less clear đ
What would be a good approach to queue HTTP requests in Clojure? Basically I'm hitting a certain HTTP API endpoint and I only want to make 6 concurrent requests at any point. If someone could suggest a good high level approach/library that helps with it, it'd be very much appreciated! On the top of my head, I'm thinking of having 1 atom counter that maintains the current number of parallel requests & spawning off up to 6 futures at a time - incrementing counter on each occasion (provided counter is 6 or less). When each future finishes execution, it decrements the counter. Do you see a flaw with this approach, is there something better I could do? Thanks!
@U2FRKM4TW I will have a look, thanks. I'm still new to concurrency approaches in Clojure/Java land.
It would work with virtual threads, hypothetically, and it doesnât require that you need dedicated threads for the api calls - you could do it in say your 50 or so jetty threads
(def *stroopwaffle-api-semaphore* (Semaphore. 6))
(defn make-api-call
[arg]
(try (.acquire *stroopwaffle-api-semaphore*)
(http/get ...)
(finally (.release *stroopwaffle-api-semaphore*))))
So it's better if and only if you're using some kind of green threads. But with regular threads it's actually worse. Right?
I know we have to do a more complicated strategy to make sure we stay below facebookâs rate limits, so this isnât the solution
It's worse in terms of the amount of code you have to write and care you have to put in. It's easy to imagine someone using that code to rate limit something while submitting all the tasks to an unbounded thread pool. And now instead of 6 threads you have one semaphore and N threads with N having no upper bound. Whereas a fixed thread pool is literally just one line of code, and it's doing everything for you.
it all depends on the guarantees you want and what your app is doing. If you can get away with just using a threadpool its good, but if you want to make sure api calls in http requests respect the heuristic wrapping the namespace in âmax 6â where the calls are made is more resilient
like you can imagine starting with a fixed thread pool of 6 and you do a task that talks to this api and does some queries on your db
but the queries take longer than the api call, so you have a pressure to increase the size of the thread pool to do it faster but that hits your api rate limit
Just like you didn't put (query-the-db)
under that semaphore, I wouldn't schedule it on the same thread pool.
since instead of an atomic counter that goes up to 6 and threads wait on it, other threads will have a threadpool of 6 threads they hand data off to, wait on, and then get data back
part of the reason a threadpool can be trouble is that people naturally submit their whole task to it. The distance between the implicit assumption that you will only call some api with some max concurrency and the part of the code where you actually call that api can be large
Assuming I understand you correctly, you're inventing a situation OP hasn't described at all, and neither have I. I'm still operating in the "need to do X requests, with up to Y requests in flight" realm.
Anything else depends on everything else. Without further description of the problem I would only add "check out core.async
and maybe ExecutorCompletionService
".
> youâre inventing a situation OP hasnât described at all, and neither have I
kinda, but thats because the fixed thread pool and the semaphore both work. Iâm trying to justify when the extra (with-acquire semaphore âŚ)
lines of code might be more appropriate and that relies on context outside of the problem description
My specific use case is to handle push notifications in a library that I'm writing. The consumer of the library would call a method in my library (say send-push-notification-async
with a token that specifies the device that must get the push notification) and my library would hit a server that'd send the push notification to the specified device. The library would also allow the consumer to send notifications to several devices, if the method above is passed, say an array of device tokens (could be thousands at a time).
The only caveat is that the push notification server/service requires that I send call it with only upto 6 requests at max concurrently. So I'd have a queue with all the requests and use this thread pool/semaphore to batch requests to this external service. Hope this gives more context.
in that case ⢠do you want to impose the stateful resource of a thread onto your library users ⢠is max N concurrency on a single machine enough if your users want to use it from the standard 3 api servers?
The library would be stateless as far as the end user is concerned. And yes, the assumption is that the library would be invoked from a single machine (in case of a distributed system, the consumer of the library would have to handle the distributed part).
The library will have to handle rate limiting (that's a requirement), although the size of the threadpool can be passed in as an option by the user.
⢠The library would be stateless as far as the end user is concerned. ⢠the size of the threadpool can be passed in as an option by the user. pick one
I was a bit unclear on your question, yes the library will take in an options object that allows the user to specify the size of max_concurrent_requests (but it's very unlikely they'd change the default value of 6).
(def p (push-lib/init {:max-concurrent-requests 6}))
(p/send-push-notification-async [<array-of-notification-objects>])
^ it'd be something like this. But the :max-concurrent-requests
param is optional.What's people's opinions on pros/cons of using core.async/pipeline
for this sort of thing? It has settable parallelism and uses a cached thread pool.
It's great when it's appropriate. Which is not always the case given this:
> Outputs will be returned in order relative to the inputs.
I'd also argue that if you don't need the rest of core.async
then you should use something else.
Also clj-http
allows using a connection manager with a fixed number of threads, reducing the burden of doing this by hand.
@U2FRKM4TW in your opinion is having a fixed thread pool a suitable solution here?
The problem description is rather superficial so hard to say for certain. But assuming you don't plan to do much with the results of the requests, then yes, a fixed thread pool is suitable and is the simplest solution.
But given what Fredrik said about clj-http
, it might be even simpler assuming you're already using that library.
@U2FRKM4TW Thanks, I think the return values are indeed required for further processing, so perhaps I'll have to look into alternatives to a thread pool.
I mean honestly you might be able to get pretty far either with clj-httpâs built-in connection pooling or even just a mix of partition
and future
To me semaphores seem like overkill if all you want to do is only have n requests in flight at a time
The hard part of your problem is not the limit, its the strategy when someone asks to send one more message, but you've hit the max of 6 already
If you want to make the caller block, I would agree with using https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/Semaphore.html
You'd do something like:
(def http-permits (Semaphore. 6 true))
(defn send-msg []
(try
(.acquire http-permits)
(http/post ...)
(finally (.release http-permits))))
So assuming 6 threads called send-msg
, they'd all be allowed to call http/post, but assuming those 6 are still going, the 7th thread that calls send-msg will block, if an 8th thread calls send-msg as well it too will block and it will be queued up after the thread 7.
Once one of the first 6 threads are done their finally block will release a permit, and thread 7 will be unblocked and allowed to proceed, etc.
The Semaphore manages both the count of maximum concurrent calls, but it also manages the blocking/unblocking and queuing up of blocked threads and the order of unblocking them all for you.
But that's one strategy. If you don't want to block the caller thread, then you'll want some kind of Queue instead. I think core.async
would be good for that. You can spawn one process that grabs from a channel and sends the http requests. Have the channel buffer be 6.
Now have another channel with a much bigger buffer say of 1000 (if you want to allow up to 1000 queued messages). Make that a dropping buffer, or a blocking, or a throwing, you can pick, in the case of reaching that limit as well.
Finally have another process that takes from the channel of 1000 and puts to the channel of 6.
Anyways, my point was, I think the hardest part is what to do once the max is reached.
@U01EB0V3H39 The partition
(actually partition-all
) approach actually looks like the best solution for my particular use case. Although, I imagine pmap
would be ideal over a future to fire off requests in parallel. This way, I'd also be able to get the HTTP response objects for each request, in order.
@U0K064KQV my problem is actually simpler, since I do not have a stream of requests to dispatch off to the thread pool. Instead I start with a vector of requests that I have to parallelize in chunks, so based on what Max suggested I came up with a solution like this:
(->> [1 2 3 4 5 6 7 8]
(partition-all 4)
(map (fn [chunk]
(pmap (fn [page]
(:body (client/get (str "" page)))) chunk)))
(flatten))
^ this code requests 8 pages, in 2 chunks of 4 parallel requests. Another upside is that I end up with a return value of the list of 8 response maps corresponding to each request.I was initially concerned that with straight up pmap
you wouldnât be able to control the thread pool size, but it appears that it uses a bounded pool for cpu-bound work and am unbounded one for io-bound work, so it should just âdo the right thingâ
https://stackoverflow.com/a/11322660
And if that doesnât work, it sounds like replacing pmap
with map future
followed by map deref
might
pmap is controlled by [n (+ 2 (.. Runtime getRuntime availableProcessors))
so in theory if someone ran this on a single core machine, you would not see parallelization anymore I believe.
Another thing is that laziness and side-effects are tricky to get right. I don't know if you expect the caller to be able to do a (take 4) and only have 4 http get request made? But I think it won't be the case because of chunked-seq
I think you can address both by going eager and using future like so:
(->> [1 2 3 4 5 6 7 8]
(partition-all 4)
(mapv (fn [chunk]
(->> chunk
(mapv (fn [page] (future (:body (client/get (str "" page)))))
(mapv deref))))
(flatten))
Just now this function is blocking, and you said your API is called send-push-notification-async
which to me I would expect that to be non-blocking. But you can just wrap the whole thing in another future for that.
It's a bit strange, so its like a batching async API? Might want to call it: send-push-notification-batch-async
. Since you give it a batch of messages as a vector and it then sends those out async.
Another issue with this by the way, is you don't control the number of requests made globally in the application. For example:
(defn send-push-notification-batch-async
[batch]
(future
(->> batch
(partition-all 4)
(mapv (fn [chunk]
(->> chunk
(mapv (fn [page] (future (:body (client/get (str "" page)))))
(mapv deref))))
(flatten)))
(let [first-batch (send-push-notification-batch-async [1 2 3 4 5 6 7 8])
second-batch (send-push-notification-batch-async [1 2 3 4 5 6 7 8])
third-batch (send-push-notification-batch-async [1 2 3 4 5 6 7 8])]
(mapv deref [first-batch second-batch third-batch]))
You see, now you have 12 concurrent http gets being made, which if the endpoint had a limit I assume would error.I know this is slightly thread necromancy, but thinking about this a little more recently, I realized some small quirks in the partition
approach, and also in clojure.core/pmap
.
Letâs take an example of a fn to use with future
/`pmap`/whatever:
(defn work [x] (Time/sleep x))
And letâs say we want a maximum of 4 work
s running at a time. If we do the partition approach on the following input:
[4 1 1 1 1]
for the 1st second, all 4 threads will be in use. But in the 2nd second, 3 threads will be sitting idle while we wait for item 0 to complete.
clojure/core/pmap
actually has the same problem:
(add-tap prn)
(->> (.. Runtime getRuntime availableProcessors)
(+ 10)
(#(repeat % 1))
(cons 5)
(map vector (range))
(pmap #(do (Thread/sleep (* 1000 (second %)))
(tap> (first %))
%)))
If you run this, youâll see it wait 1 second, then print 1-14, then wait 4 seconds, then print 0, then wait 1 second, then print 15-22.
https://github.com/TheClimateCorporation/claypoole has a drop-in replacement for pmap
that acts as youâd expect:
(require '[com.climate.claypoole :as cp])
(->> (.. Runtime getRuntime availableProcessors)
(+ 10)
(#(repeat % 1))
(cons 5)
(map vector (range))
(cp/pmap (+ 2 (cp/ncpus))
#(do (Thread/sleep (* 1000 (second %)))
(tap> (first %))
%)))
The above waits 1 second, prints 1-13, waits 1 more second, prints 14-22, then finally prints 0. So it minimizes thread idle, time, no thread is ever not doing anything.@thom704 Re https://clojurians.slack.com/archives/C03S1KBA2/p1635147158169400?thread_ts=1635093533.129400&cid=C03S1KBA2: Sounds like a job for a message queue. That way you can decouple the messaging components from one another.