Fork me on GitHub
#aleph
<
2019-02-05
>
Empperi12:02:03

Has anyone ever implemented a ManyToManyChannel to ByteBuf conversion? Especially regarding to byte-streams/def-conversion?

Empperi12:02:23

I have never implemented these conversions and have already spent way too much time into this

kachayev13:02:47

@niklas.collin Do you mean “to read until closed and convert to ByteBuf whatever was there”?

Empperi13:02:54

I have actually another problem too but even though that is related it is not strictly the same

kachayev13:02:30

You can try implement this by extending ByteSource protocol, let me find you an example

kachayev13:02:05

I’m not sure if it’s a good thing to do in general… such reader would be blocking

kachayev13:02:49

Maybe there’s another approach for tackling the same problem? Or you’re okay with blocking version?

Empperi14:02:33

Maybe. I'll think about this. Cheers for the example 👍

lmergen14:02:49

@niklas.collin as a matter of fact, i've done exactly this, but it's much simpler

Empperi14:02:07

Sounds promising

lmergen14:02:37

it's pretty evil, though

lmergen14:02:03

i basically use manifold.stream/stream->seq, and then reduce the whole sequence into a single bytebuf 🙂

lmergen14:02:32

(defn ^ByteBuffer concat-bytebufs
  "Takes a collection of bytebuffers and returns a single large concatenated bytebuffer."

  [xs]

  (let [size (->> xs
                  (map #(.remaining %))
                  (reduce +))
        bb (ByteBuffer/allocate size)

        ;; Puts a bytebuffer on the large bytebuffer
        put-on-bb (fn [^ByteBuffer big ^ByteBuffer small]
                    (assert (>= (.remaining big) (.remaining small)))
                    (.put big (.slice small)))]

    (run! (partial put-on-bb bb) xs)
    (.flip bb)))

lmergen14:02:33

it's probably not the best performing way to do it

kachayev14:02:41

@lmergen bytes-streams actually has conversions defined for (seq-of ByteBuffer). Meaning… concat-bytebuf should work out of the box, no?

kachayev14:02:04

@niklas.collin Are we talking about ByteBuf from Netty or ByteBuffer from NIO?

lmergen14:02:05

so that makes me an idiot for writing this code 🙂

Empperi14:02:29

So ByteBuf

lmergen14:02:29

interesting, i did not know about that function

kachayev14:02:34

So, it’s a bit different and I’ve got the question correct

lmergen14:02:20

that's awesome, thanks @kachayev

kachayev14:02:33

stream-of ByteBuffer to InputStream means that everything else should go automatically

Empperi14:02:22

But anyway. Basically what I would like to do is return a core async channel, then let it be automatically serialized as data comes in an async manner

kachayev14:02:25

core.async channel -> manifold.stream is simple

kachayev14:02:28

serialized and… what should happen next?

kachayev14:02:39

written to network?

Empperi14:02:44

Naturally it should be written to the socket

Empperi14:02:13

And yes, websockets

kachayev14:02:25

If you use aleph for handling websocket connection, everything you need to do is to “connect” your core.async channel to manifold.stream that represent the connection

kachayev14:02:12

Aleph handles serialization to ByteBuf from pretty much anything byte-streams can understand

kachayev14:02:44

Basically it means “take everything that comes from here and put it there”. If you need to apply custom serialization, just use manifold.stream/map. You don’t need to worry about netty’s ByteBuf - something that’s a text or binary (like byte array) would work