Fork me on GitHub

Hello everyone, I'm using Aleph + Manifold + Byte-streams to create an application that pushes 6MB per second to a frontend through a websocket. With a recent laptop, I can't manage to have more than 1MB per second stabilized, which seems strange. The data is stored as a vector of byte arrays that I put in the sending stream with the bs/to-byte-buffers function. That is the best performance that I managed to get. With htop, I see one core working at ~ 100% and the others doing almost nothing. Am I missing something? Is there a dedicated data structure conversion that enables Aleph to have a better throughput with a websocket? Or is the websocket the problem? Thanks a lot.


What process exactly consumes that core? What exactly does it do during that time?

Matthew Davidson (kingmob)12:12:09

Hi @U94V75LDV , two things: 1) a lot of us are busy for the holidays, so it may take some time to get back to you, and 2) we could really use some more details, a minimal reproducible example would help a lot

👍 1

Thanks for the replies. Yes no hurry, the communication is asynchronous on Slack (as in Aleph :face_with_hand_over_mouth:). I was hopping that my problem was trivial and that I would have missed something in the documentation but it doesn't seem so. I'll take time to describe it more thoroughly tomorrow. Happy new year!


Hello again, Here is a schema of the system.


1. There are 1000 TCP streams that push regularly messages of 240 bytes. 2. Those messages are stacked in a Clojure vector 3. Every n milliseconds the stacked byte arrays are flushed to a web frontend through a websocket connection.


> What process exactly consumes that core? > What exactly does it do during that time? I can see with htop that the Java/Clojure process is at 100% of a CPU. Then I tried to use VisualVM but I didn't manage to figure which thread(s) were consuming CPU and what was happening exactly. I tried to use a flame graph with the flames package but did not manage to read it to understand what was happening.


I miss some skills to do profiling on a Java/Clojure application. Would someone have availability for a paid code review to help me on this situation?

Matthew Davidson (kingmob)05:01:53

@U94V75LDV Hmm, it's interesting that you're seeing so much work on a single core. That suggests it's the aggregator or later, when things have become serial. A few questions: 1. What versions of things are you on? 2. Where precisely are you using byte-streams? Are those the 1000 TCP streams, or are you using it later to aggregate, or both? 3. If not, how are you aggregating? Are you using transients, or a ByteBuf, or something designed to efficiently accumulate bytes? 4. Is there any/a lot of serializing work being done? The amount of data doesn't seem that large I have little experience with VisualVM, but I've used YourKit+IntelliJ pretty successfully for debugging Clojure in the past. I'm on vacation in Japan, but I might be available for a paid review later this week. DM me and maybe we can work something out.


@U10EC98F5 thanks, > 1. What versions of things are you on?

aleph/aleph {:mvn/version "0.6.0"}
manifold/manifold {:mvn/version "0.3.0"}
org.clj-commons/byte-streams {:mvn/version "0.3.1"}
> 2. Where precisely are you using byte-streams? Are those the 1000 TCP streams, or are you using it later to aggregate, or both? > 3. If not, how are you aggregating? Are you using transients, or a ByteBuf, or something designed to efficiently accumulate bytes? The TCP streams (using aleph.tcp) seem to return byte arrays (`(class value)` returns [B). I then accumulate them in a Clojure vector. Every 40ms I flush the accumulated byte-streams into the a stream that is connected to the socket returned by (http/websocket-connection req). Whether I flush the byte-arrays directly or I transform them with bs/to-byte-buffers , I reach a maximum throughput that seems fairly low and see a core reaching 100% usage. I tried accumulating in a transient vector but nothing changed. > 4. Is there any/a lot of serializing work being done? No the bytes are sent "as is". Enjoy your trip in Japan. I'll DM you for a code review appointment.

Arnaud Geiser22:01:59

Hello @U94V75LDV. I was also interested by the exercise so I spent a bit of time on something real naive here (just evaluate line 64): Does the design kind of match what you are trying to do? From my experience, here are some hints: 1. Ensure you are using raw-stream? from both your TCP and HTTP servers [1] 2. Do not aggregate your ByteBuf on a vector but on a CompositeByteBuf instead [2] 3. Try to send big chunks (64 KiB - websocket limit) instead of small ones (240 bytes) [3] A question: Why not streaming instead of aggregating with your websocket connection? Happy to help if I can but I'm pretty sure that @U10EC98F5 will give you more robust insights as streaming is definitely his expertise. [1] : [2] : [3] :

Arnaud Geiser22:01:35

Another question: what are you doing in your websocket-handler? Having a single CPU core in use might match what you are doing inside this handler.

Arnaud Geiser12:01:51

Here is an alternative that uses a stateful transducer to construct chunk of 64 KiB.


Thanks a lot @UFKCFTVB8. I'll check all that with great interest and will bring back feedbacks.

Arnaud Geiser13:01:10

I will post the associated flamegraph later on. There is actually something really interesting on it (related to Aleph internals, not related to the problem you are trying to solve).

Arnaud Geiser13:01:51

Most of the time spent on the user land is on bound-fn*

Arnaud Geiser13:01:17

Removing this bound-fn* from operation-complete really helps with the CPU usage. It has been introduced by the following PR. [1] It was actually NOT necessary to solve the related issues but we thought it was cheap enough. [1] :

Arnaud Geiser14:01:13

Here is the gist from the discussion we had with @U10EC98F5: Oh yeah, because the code runs on netty and signal threads that aren’t started by the Clojure test, so there’s no automatic thread frame propagation, defeating the use of binding under the hood. Well, the usual solution to interop with Java threads is bound-fn , and that works here. Truthfully, we could probably use bound-fn more when handing things off to Netty to run, it’s always safer. Anyway, this works:

(deftest test-classloader
  (testing "classloader: ensure the class loader is always a DynamicClassLoader"
    (let [result (CompletableFuture.)]
      (with-dynamic-redefs [netty/operation-complete (partial operation-complete result)]
        (let [server (http/start-server
                      (constantly {:body "ok"})
                      {:port 9999})]
          (on-signal :int
                     (bound-fn [_]
                       (.close ^ server)))
          (.exec (Runtime/getRuntime) (format "kill -SIGINT %s" (pid)))
          (is (= (deref result 30000 ::timeout) true)))))))
(defn wrap-future
  [^Future f]
  (when f
    (if (.isSuccess f)
      (d/success-deferred (.getNow f) nil)
      (let [d (d/deferred nil)
            bound-operation-complete (bound-fn* operation-complete)]
        (.addListener f
          (reify GenericFutureListener
            (operationComplete [_ _]
              (bound-operation-complete f d))))
(modifié) [9 h 28] The use of bound-fn in the test ensures it inherits the dynamic redef frame, and bound-fn* in wrap-future ensures the listener will work even if .close->wrap-future is called from a signal thread [9 h 29] We shold probably check everywhere we accept user callbacks for netty and wrap them in bound-fn for safety

Arnaud Geiser14:01:45

I would consider removing that bound-fn* on operation-complete as it was clearly introduced for safety. What do you think Matthew?

Arnaud Geiser14:01:42

(I will create an issue later today on Aleph)


@UFKCFTVB8 and @U10EC98F5 thanks a lot. We managed to have something working correctly with your advice. I discovered the Netty documentation and... it's kind of hard to get used to. It is the first time I go this deep into a Java lib and it's not the same as using ClojureDocs and reading Clojure source code. @UFKCFTVB8 when I aggregate the ByteBuf returned by by the :raw-stream? TCP connection (which returns a PooledUnsafeDirectByteBuf) I randomly get an IllegalReferenceCountException with info about a .release method not called before GC and a risk of memory leaks. This error seems to come when the ByteBufs are kept too long in the compositeBuffer. The code of the application should be open-sourced once finished. I'll post the link here when it's done. Thanks a lot for your time and advice.

Arnaud Geiser10:01:38

During development, I would use this JVM property to ensure all your ByteBuf are correctly released.

This warning means there are ByteBuf objects with a refcounting of 1 while those can be garbage collected. So either you have a dangling CompositeByteBuf or some ByteBuf that were never tied to any CompositeByteBuf. Depending on how your solution is implemented, you probably have to call netty/release on your CompositeByteBuf . Without source code it's really hard to give you more information. 🙂