This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2024-01-04
Channels
- # announcements (1)
- # babashka (1)
- # beginners (84)
- # biff (22)
- # calva (9)
- # cider (8)
- # clerk (5)
- # clj-kondo (10)
- # clojure (105)
- # clojure-europe (13)
- # clojure-nl (1)
- # clojure-norway (44)
- # clojure-spec (4)
- # clojure-uk (6)
- # clojuredesign-podcast (36)
- # cursive (13)
- # datomic (24)
- # dev-tooling (8)
- # emacs (8)
- # hyperfiddle (4)
- # jobs (1)
- # leiningen (2)
- # london-clojurians (1)
- # lsp (5)
- # malli (6)
- # membrane (11)
- # nyc (1)
- # off-topic (14)
- # other-languages (8)
- # pathom (25)
- # pedestal (2)
- # re-frame (4)
- # releases (1)
- # remote-jobs (1)
- # shadow-cljs (98)
- # sql (5)
- # squint (1)
- # tools-deps (38)
- # vim (8)
- # xtdb (11)
I made a way to convert inputstream to base64, this is a Java interop that worked as expected:
(s/defn ^:private stream-to-base64-java-interop
[input-stream :- InputStream]
(let [output-stream (ByteArrayOutputStream.)]
(loop [buffer (byte-array 1024)]
(let [n (.read input-stream buffer)]
(if (pos? n)
(do
(.write output-stream buffer 0 n)
(recur buffer))
(let [bytes (.toByteArray output-stream)]
(String. (.encode (Base64/getEncoder) bytes) "UTF-8")))))))
Is there a more Clojure semantic function or an existing lib, yet being a wrapper? Thanks!This improved a lot:
(s/defn ^:private stream-to-base64-clojure-way
[input-stream :- InputStream]
(let [output-stream (ByteArrayOutputStream.)]
(io/copy input-stream output-stream)
(let [bytes (.toByteArray output-stream)]
(String. (.encode (Base64/getEncoder) bytes) "UTF-8"))))
https://docs.oracle.com/javase/8/docs/api/java/util/Base64.Encoder.html#wrap-java.io.OutputStream-
late to the party but I have these, which work from all of the input types supported by https://clojuredocs.org/clojure.java.io/copy to any output type coercible by io/output-stream
+ strings
(let [b64 (Base64/getEncoder)]
(defn ->b64-out
[in out]
(with-open [out-stream (io/output-stream out)
b64-stream (.wrap b64 out-stream)]
(io/copy in b64-stream)))
(defn ->b64-str
[in]
(with-open [baos (ByteArrayOutputStream.)]
(->b64-out in baos)
(.toString baos "UTF-8"))))
(comment
(let [r (Random.)
bs (byte-array 32)]
(.nextBytes r bs)
(->b64-out bs "/dev/null")
(->b64-str bs)
;; activate space-heating mode
(->b64-out (io/file "/dev/random") "/dev/null")))
No late for party! ๐
how to increment the age of the following data using specter
? I asked the ChatGPT, but run blow code return error
(require '[com.rpl.specter :as s])
(def data
{:users [{:name "Alice" :age 25}
{:name "Bob" :age 30}
{:name "Charlie" :age 22}]
:projects [{:title "Project A" :status "active"}
{:title "Project B" :status "inactive"}
{:title "Project C" :status "active"}]})
(def selector [:users ALL :age])
(def updated-data (s/transform selector inc data))
(println updated-data)
ChatGPT has a reasonable potential to save you time only if you can check and correct its output. If you can't then your only hope is that it gives you the right answer right away, which IME rarely happens.
> ... code return error
is maybe not specific enough. was the error about ALL
not being resolvable?
Hello people, a question in regards to the interaction of sockets and Java virtual threads. In this case it is the interaction of two tools clj-sockets
and virtuoso
for example. In my code i'm planning to create a socket connection through network (secure) to another system. Once connected the clojure code in question begins reading the socket, line by line in a loop. Waiting for messages, when the connection gets a full message clojure code parses it, and writes an acknowledgement of "message received" to the socket, and will then continue the read-loop waiting for more content.
With that functionality as a context, does it make sense to use virtuoso
to create multiple virtual threads, one thread per one socket? Or am I shooting myself in the foot here in performance if I use virtuoso with this kind of implementation?
The question comes from lack of understanding what happens under the hood when clojure is reading a socket messages, line by line in a loop. As the Java video described virtual threads. To paraphrase: "Virtual threads shouldn't be used to perform in memory computations, rather they should be used when blocking actions occur in a thread." As my limited understanding is that i'm technically not being blocked, atleast not for a significant period of time, when I'm reading the content of a socket.
So is virtual thread the correct choice in the kind of implementation i'm making or should I adjust either the logic or use something other than a virtual thread?
Edit: the context of this question comes from youtube video "Java 21 new feature: Virtual Threads".
Never mind the above question is moot. I did some more digging and it turns out i've understood the read-line functionality of clj sockets incorrectly. In a healthy connection read-line is a blocking operation, when called it will wait until it gets the new line from the other system so the performance benefit of the virtual thread could be present.
Hi! Does Clojure have some way of doing this: โข I call a function that does a lot of work, in my case generating a large PDF. โข if that takes too long, I want to abort mission. ie. I want a timeout. If possible, what would be the best way of doing things?
Depends on how that PDF generation is written. Clojure itself doesn't really offer anything in addition to what Java already supports, so any suitable Java solution will do just fine.
E.g. you can use future-cancel
on a future. But then you have to make sure that the code that the future runs is actually interruptible.
Yeah with the latest JDKs it is only possible if that long running function is prepared to be cancel. Which means it should be checking from time to time if it's current thread .isInterrupted() so you can then do longRunningThread.interrupt() from a different thread
> Which means it should be checking if it's current thread .isInterrupted()
Not necessarily, since e.g. Object.wait
and some other things throw if called from an interrupted thread.
But an explicit check is, well, explicit. :)
yeah but if the function is in a loop, let's say crunching numbers, that is taking forever, the only supported way is for it to check .isInterrupted() isn't?
got it
I understand the problems Thread.stop() was causing, but weird they couldn't provide a replacement. Currently there is no way to stop threads running code that isn't prepared to be stopped, which is probably the case for a ton of libraries
I guess that might create some problems for dev tooling but Iโm tempted to think itโs a good thing in general
I don't think it is only related to dev tooling. In general if your application has some possibly long running actions, for good UX, you want to provide functionality to the user to cancel the current task. If your application uses third party code to do some of those tasks (like a PDF creation lib) then you depend on all of them implementing a way of interrupting.
exactly this is now the case. So my โfixโ would now be to create some http link to an external worker server, and have an HTTP time-out โฆ
but that has tons of other drawbacks, and of course, the other server still has that problem, but at least my app doesnโt become unresponsive
maybe you can fork the PDF library and add the .isInterrupted() check in some loop?
thanks.. thatโd be clj-pdf which in turn uses LibrePDF, so bit of a double forking thing then ๐
Not necessarily - you should be able to depend on clj-pdf
and provide your own version of LibrePDF
.
But depends on how exactly a PDF is generated. It might be the opposite - maybe you need to fork clj-pdf
but not LibrePDF
. Or, as you said, maybe you have to fork both if they both have busy loops that never check for thread interruption.
I mean it would be nice to have such general and reliable mechanism but since it doesnโt exist itโs probably better that the dangerous stop method cannot(?) be used anymore
what I would do in this case is to run visual-vm, attach it to the process, then start generating a big pdf and do a thread dump in the middle. Maybe from the stack trace and the source code you can figure out the loop you need to modify
of if you run it from a terminal clj repl you can hit Ctrl-\
to get a thread dump also
maybe you already checked, but I would also make sure it doesn't already implement something for it with something like:
(def t (Thread. (fn [] (create-big-pdf))))
(.start t)
(.interrupt t)
I think the main problem with killing a thread that's doing this sort of work is leaking resources like file handles and so on. If you fork
in the POSIX sense of running it in a separate process, then you can rely on the kernel to clean up when you kill the worker process. That might be safer and have the semantics you're after?
Another thing to consider is maybe using core.async to pass the work off to a separate process and listen to a return channel for the response with an 'alt' with a timeout. This won't kill the process that's generating the PDF but wiil mean that if it takes too long you return to the main processing.
Is it known/expected that using buffered core.async
channels will lead to performance drop? I have an example where I see 25% longer run times after adding a buffer (the size doesn't seem to matter โ just the difference between buff/no buff).
Benchmarking is hard, concurrency is complicated, would need more details about the testing
It is a well known phenomena that overloaded systems will struggle a long for a while and then have performance fall off a cliff so something like that could be happening (buffer allowing more load on an already overloaded system)
let me share the example, because it doesn't seem to fall in either of the categories you describe
here it is: https://github.com/a1exsh/1brc/blob/main/avg.clj#L62 the difference is ~2000ms unbuffered vs 2500ms if I introduce buffer of at least 1 at that line
not helpful on the performance front, but it seems like the only reason you are using concurrency here is to basically send values backwards to computations setup by previous steps in the reduce to aggregate values, when reduce is already an aggregating process? seems unlikely the overhead of concurrency is every going to be overcome
yea, it's not really clear why you're using async operations here when you could just be using regular data structures
core.async isn't really for parallelizing work, it is for coordinating concurrent processes, so go blocks turn in to calbacks, and those callbacks run an a fixed size internal threadpool
yeah, good point. I'm just experimenting, looking for options to utilize the resources as much as possible
I sort of think when you add a buffer it is freeing up the reduce to rip through its work, then swamping the fixed pool
there's kind of a lot going on. I might try creating a flamegraph with https://github.com/clojure-goes-fast/clj-async-profiler to see if it provides any insights, but go blocks are notorious for making flame graphs less useful.
I have a variant with agents at the bottom of that file, it runs slower at ~3000ms, and has a problem with getting the results reliably, that I sort of solved with blocking on the results-chan here
https://github.com/aphyr/tesser might be something to look at, it is for parallelizing folds
the problem I see is that reading lines is going to dominate the runtime, so parallelizing the stats aggregation won't help.
if the time it took to calculate stats for each line was really involved, then parallelizing the stats would help.
if you can break the file into multiple chunks or separate the file into multiple files, then you can do a map+reduce style parallel computation.
reading 1Mio lines with line-seq
is 180ms. reading and parsing the way I do it currently is ~500ms
I thought about scanning multiple portions of the file in parallel, but that looks very unattractive (don't want to go there for a puzzle, would go for a real problem ๐
@U0NCTKEV8 stat-chan at line 62
I bet the parsing takes longer than the stats aggregation. If anything, I would parallelize that.
or Long/MAX_VALUE, or if you want to be less silly something close to the expected number of results
@U7RJTCH6J how would you parallelize the parsing? using pmap
, for example? I think I tried that and it ruined the performance even worse
you can search this slack to find people railing against clojure.core/pmap and suggesting alternatives
I think I tried that and it ruined the performance even worsethat's not surprising. I'm not multiple threads is going to speed anything up (without chunking the file) due to Amdahl's law . I think pmap might be able to do ok, but you would want to chunk the lines first into reasonable sized tasks. pmap doesn't do that well if each individual task is small.
hm, interesting. I guess maybe I could batch lines into blocks of some 1000s and feed those batches to pmap
I might try that, but fair warning, it might not help.
a lot of "map" type stuff is actually not want you want here, because mapping is inherently ordered (usually in clojure) and I don't think you really care about ordering
that's true. I just need to be able to read as fast as possible from that file (it is expected to reside in the disk cache)
so something like https://github.com/clj-commons/claypoole/blob/master/src/clj/com/climate/claypoole.clj#L429-L433
claypoole is a library that is basically built around "could we fix pmap". it maybe the library I suggest to people the most often despite having never personally used it
my approach would be to use https://github.com/cgrand/xforms and compose the pipeline via transducers. if you did parallelize it, it's pretty straightforward to reuse the individual steps.
(fn [branch f init coll]
(async/= (count a') branch)
[(async/reduce f init (async/merge a'))]
a'))))
[]
coll))) ; written but not run, etc
ah, I guess what helps here is that min/max/sum/count that I have can be calculated separately for each batch and then aggregated in a second step across batches (similar to that diagram in tesser)...
that reduce in the thread should likely be a transduce with a map transducer that does the parsing
min max sum and count are trivially commutative monoids which makes them great for processing via folds
if you look at https://github.com/cgrand/xforms, there's a couple options for running several aggregations in a single pass
eg. transjuxt
yep, pmap on batches of 1,000 lines gives me the desired speed-up: 350ms vs ~900ms
it still hits me when I have to merge the aggregates (calling the f
in merge-with
375k times!) the runtime is back to 2000ms due to that.
thinking about doing that in logarithmic number of stages instead
and I kept fiddling with it
(fn [branch exec combiner mapper init coll]
(letfn [(m [a']
(async/go
(let [l (async/= (count a') branch)
[(m a')]
a'))))
[]
coll)))
branch is the branching factor, exec is a function that given a thunk runs it (on another thread) and returns a channel to communicate the result back, combiner is the folding / aggregating function, mapper is an initial mapping function (parsing here) and coll would be the linesthis code runs in 1090ms:
(time
(with-open [rdr (io/reader "measurements2.txt")]
(->> rdr
line-seq
(take 1000000)
(partition-all 1000)
(pmap agg-batch)
(partition 32)
(pmap agg-aggs)
agg-aggs
(into (sorted-map))
report-avgs)))
@U0NCTKEV8 if you haven't thrown away the tree-building function, do you have a complete working example? looks like the one you shared last has a problem that it mixes channels and actually computed values (for me reduce
fails because it expects a seq, but gets a channel)
I realized later that of course it doesn't build a complete tree, because it doesn't iterate, it only builds a fixed 3 levels, but the above definitely doesn't pass a channel to reduce
and m merges and intos and then takes the result from the input channel and feeds it to reduce
True. I need to debug then. Can you share a complete working example of your code somewhere?
I did end up fiddling with it some more and wrote
(defn f [mapping-branch combining-branch exec combiner mapper init coll]
(letfn [(g [^java.util.concurrent.CompletableFuture cf]
(.get cf))
(m [a']
(let [cf (java.util.concurrent.CompletableFuture.)]
(.handle
(java.util.concurrent.CompletableFuture/allOf
(into-array
java.util.concurrent.CompletableFuture
a'))
(reify
java.util.function.BiFunction
(apply [_ result exception]
(if exception
(.completeExceptionally cf exception)
(exec (fn []
(try
(.complete cf (transduce (map g) combiner init a'))
(catch Throwable t
(.completeExceptionally cf t)))))))))
cf))
(c [a]
(into []
(comp (partition-all combining-branch)
(map m))
a))
(i [a]
(if (> (count a) 1)
(recur (c a))
a))]
(i
(into []
(comp (partition-all mapping-branch)
(map (fn [lines]
(exec #(transduce (map mapper) combiner init lines)))))
coll))))
which iterates and builds a tree, and doesn't use core.asyncBut the above is all there is, the previous versions are gone, rewritten in a scratch buffer
So I'm pretty happy with my variant that doesn't require extra deps or Java interop. The essence is these 3 lines:
(partition-all 100000)
(pmap agg-batch)
agg-aggs
The tricky part is that finding a sweet spot for the batch size is empiric.
Do you know of any libs that can enable dynamically picking the "right" size for the batch, e.g. by measuring throughput or applying backpressure, to make the whole pipeline go as fast as possible?So I'm pretty happy with my variant that doesn't require extra deps or Java interop. The essence is these 3 lines:
(partition-all 100000)
(pmap agg-batch)
agg-aggs
The tricky part is that finding a sweet spot for the batch size is empiric.
Do you know of any libs that can enable dynamically picking the "right" size for the batch, e.g. by measuring throughput or applying backpressure, to make the whole pipeline go as fast as possible?