This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2022-12-30
Channels
- # aleph (25)
- # announcements (20)
- # babashka (29)
- # babashka-sci-dev (12)
- # beginners (27)
- # biff (3)
- # clojure (29)
- # clojure-europe (21)
- # clojure-nl (1)
- # clojure-norway (1)
- # clojurescript (3)
- # clr (5)
- # code-reviews (4)
- # data-science (7)
- # datahike (6)
- # datascript (3)
- # emacs (9)
- # fulcro (5)
- # graalvm (10)
- # malli (15)
- # nbb (7)
- # off-topic (17)
- # pathom (9)
- # polylith (4)
- # practicalli (15)
- # reitit (3)
- # releases (2)
- # rum (1)
- # shadow-cljs (73)
- # squint (34)
- # tools-deps (3)
- # xtdb (11)
Hello everyone,
I'm using Aleph + Manifold + Byte-streams to create an application that pushes 6MB per second to a frontend through a websocket. With a recent laptop, I can't manage to have more than 1MB per second stabilized, which seems strange.
The data is stored as a vector of byte arrays that I put in the sending stream with the bs/to-byte-buffers
function. That is the best performance that I managed to get.
With htop, I see one core working at ~ 100% and the others doing almost nothing.
Am I missing something? Is there a dedicated data structure conversion that enables Aleph to have a better throughput with a websocket? Or is the websocket the problem?
Thanks a lot.
Hi @U94V75LDV , two things: 1) a lot of us are busy for the holidays, so it may take some time to get back to you, and 2) we could really use some more details, a minimal reproducible example would help a lot
Thanks for the replies. Yes no hurry, the communication is asynchronous on Slack (as in Aleph :face_with_hand_over_mouth:). I was hopping that my problem was trivial and that I would have missed something in the documentation but it doesn't seem so. I'll take time to describe it more thoroughly tomorrow. Happy new year!
1. There are 1000 TCP streams that push regularly messages of 240 bytes. 2. Those messages are stacked in a Clojure vector 3. Every n milliseconds the stacked byte arrays are flushed to a web frontend through a websocket connection.
> What process exactly consumes that core?
> What exactly does it do during that time?
I can see with htop that the Java/Clojure process is at 100% of a CPU. Then I tried to use VisualVM but I didn't manage to figure which thread(s) were consuming CPU and what was happening exactly. I tried to use a flame graph with the flames
package but did not manage to read it to understand what was happening.
I miss some skills to do profiling on a Java/Clojure application. Would someone have availability for a paid code review to help me on this situation?
@U94V75LDV Hmm, it's interesting that you're seeing so much work on a single core. That suggests it's the aggregator or later, when things have become serial. A few questions: 1. What versions of things are you on? 2. Where precisely are you using byte-streams? Are those the 1000 TCP streams, or are you using it later to aggregate, or both? 3. If not, how are you aggregating? Are you using transients, or a ByteBuf, or something designed to efficiently accumulate bytes? 4. Is there any/a lot of serializing work being done? The amount of data doesn't seem that large I have little experience with VisualVM, but I've used YourKit+IntelliJ pretty successfully for debugging Clojure in the past. I'm on vacation in Japan, but I might be available for a paid review later this week. DM me and maybe we can work something out.
@U10EC98F5 thanks, > 1. What versions of things are you on?
aleph/aleph {:mvn/version "0.6.0"}
manifold/manifold {:mvn/version "0.3.0"}
org.clj-commons/byte-streams {:mvn/version "0.3.1"}
> 2. Where precisely are you using byte-streams? Are those the 1000 TCP streams, or are you using it later to aggregate, or both?
> 3. If not, how are you aggregating? Are you using transients, or a ByteBuf, or something designed to efficiently accumulate bytes?
The TCP streams (using aleph.tcp) seem to return byte arrays (`(class value)` returns [B
). I then accumulate them in a Clojure vector. Every 40ms I flush the accumulated byte-streams into the a stream that is connected to the socket returned by (http/websocket-connection req)
. Whether I flush the byte-arrays directly or I transform them with bs/to-byte-buffers
, I reach a maximum throughput that seems fairly low and see a core reaching 100% usage. I tried accumulating in a transient vector but nothing changed.
> 4. Is there any/a lot of serializing work being done?
No the bytes are sent "as is".
Enjoy your trip in Japan. I'll DM you for a code review appointment.Hello @U94V75LDV.
I was also interested by the exercise so I spent a bit of time on something real naive here (just evaluate line 64): https://github.com/arnaudgeiser/aleph-tests/blob/master/src/aleph_tests/core.clj
Does the design kind of match what you are trying to do?
From my experience, here are some hints:
1. Ensure you are using raw-stream?
from both your TCP and HTTP servers [1]
2. Do not aggregate your ByteBuf
on a vector but on a CompositeByteBuf
instead [2]
3. Try to send big chunks (64 KiB - websocket limit) instead of small ones (240 bytes) [3]
A question:
Why not streaming instead of aggregating with your websocket connection?
Happy to help if I can but I'm pretty sure that @U10EC98F5 will give you more robust insights as streaming is definitely his expertise.
[1] : https://github.com/arnaudgeiser/aleph-tests/blob/master/src/aleph_tests/core.clj#L54-L55
[2] : https://github.com/arnaudgeiser/aleph-tests/blob/master/src/aleph_tests/core.clj#L17
[3] : https://github.com/arnaudgeiser/aleph-tests/blob/master/src/aleph_tests/core.clj#L44
Another question: what are you doing in your websocket-handler? Having a single CPU core in use might match what you are doing inside this handler.
Here is an alternative that uses a stateful transducer to construct chunk of 64 KiB. https://github.com/arnaudgeiser/aleph-tests/blob/streaming/src/aleph_tests/core.clj
Thanks a lot @UFKCFTVB8. I'll check all that with great interest and will bring back feedbacks.
I will post the associated flamegraph later on. There is actually something really interesting on it (related to Aleph internals, not related to the problem you are trying to solve).
Most of the time spent on the user land is on bound-fn*
Removing this bound-fn*
from operation-complete really helps with the CPU usage.
It has been introduced by the following PR. [1]
It was actually NOT necessary to solve the related issues but we thought it was cheap enough.
[1] : https://github.com/clj-commons/aleph/pull/604
Here is the gist from the discussion we had with @U10EC98F5:
Oh yeah, because the code runs on netty and signal threads that aren’t started by the Clojure test, so there’s no automatic thread frame propagation, defeating the use of binding
under the hood. Well, the usual solution to interop with Java threads is bound-fn
, and that works here. Truthfully, we could probably use bound-fn more when handing things off to Netty to run, it’s always safer.
Anyway, this works:
(deftest test-classloader
(testing "classloader: ensure the class loader is always a DynamicClassLoader"
(let [result (CompletableFuture.)]
(with-dynamic-redefs [netty/operation-complete (partial operation-complete result)]
(let [server (http/start-server
(constantly {:body "ok"})
{:port 9999})]
(on-signal :int
(bound-fn [_]
(.close ^java.io.Closeable server)))
(.exec (Runtime/getRuntime) (format "kill -SIGINT %s" (pid)))
(is (= (deref result 30000 ::timeout) true)))))))
(defn wrap-future
[^Future f]
(when f
(if (.isSuccess f)
(d/success-deferred (.getNow f) nil)
(let [d (d/deferred nil)
bound-operation-complete (bound-fn* operation-complete)]
(.addListener f
(reify GenericFutureListener
(operationComplete [_ _]
(ensure-dynamic-classloader)
(bound-operation-complete f d))))
d))))
(modifié)
[9 h 28] The use of bound-fn in the test ensures it inherits the dynamic redef frame, and bound-fn* in wrap-future ensures the listener will work even if .close->wrap-future is called from a signal thread
[9 h 29] We shold probably check everywhere we accept user callbacks for netty and wrap them in bound-fn for safetyI would consider removing that bound-fn*
on operation-complete as it was clearly introduced for safety.
What do you think Matthew?
(I will create an issue later today on Aleph)
Here is the PR open for discussion : https://github.com/clj-commons/aleph/pull/652
@UFKCFTVB8 and @U10EC98F5 thanks a lot. We managed to have something working correctly with your advice.
I discovered the Netty documentation and... it's kind of hard to get used to. It is the first time I go this deep into a Java lib and it's not the same as using ClojureDocs and reading Clojure source code.
@UFKCFTVB8 when I aggregate the ByteBuf returned by by the :raw-stream? TCP connection (which returns a PooledUnsafeDirectByteBuf
) I randomly get an IllegalReferenceCountException
with info about a .release method not called before GC and a risk of memory leaks. This error seems to come when the ByteBufs are kept too long in the compositeBuffer
.
The code of the application should be open-sourced once finished. I'll post the link here when it's done.
Thanks a lot for your time and advice.
During development, I would use this JVM property to ensure all your ByteBuf
are correctly released.
-Dio.netty.leakDetection.level=paranoid
This warning means there are ByteBuf
objects with a refcounting of 1 while those can be garbage collected. So either you have a dangling CompositeByteBuf
or some ByteBuf
that were never tied to any CompositeByteBuf
.
Depending on how your solution is implemented, you probably have to call netty/release
on your CompositeByteBuf
.
Without source code it's really hard to give you more information. 🙂Here is the documentation regarding leak detection : https://netty.io/wiki/reference-counted-objects.html#leak-detection-levels