Fork me on GitHub
#aleph
<
2022-07-30
>
bpiel16:07:34

Hi. https://github.com/clj-commons/aleph states that: "Aleph exposes data from the network as a https://github.com/clj-commons/manifold stream, which can easily be transformed into a java.io.InputStream" Can anyone tell me how to transform a manifold stream into an InputStream (or an OutputStream)? I've worked with aleph and manifold many times over the years, but it seems like I've never done this. I've read through docs and googled, so I suspect I'm missing something obvious. thanks

dergutemoritz17:07:37

https://github.com/clj-commons/byte-streams would be the idiomatic choice (in the context of aleph at least :))

bpiel17:07:45

@U06GVE6NR I should have mentioned that I've tried that lib out as well, but still have not succeeded

dergutemoritz17:07:55

Oh! How did you try it?

bpiel17:07:01

Let's see

bpiel17:07:04

ok, by "try" I guess I meant that I tried, but failed, to determine (from the docs) which function would be of use here

dergutemoritz17:07:23

Try to-input-stream

bpiel17:07:55

haha...........

dergutemoritz17:07:16

Too obvious? 😄

bpiel17:07:17

hmmm... ok, so apparently I did do that already

bpiel17:07:34

which means that I guess I meant to ask how to go the opposite direction

bpiel17:07:03

but yeah, there was definitely a mismatch between what I was reading/saying and what I wanted to do

dergutemoritz17:07:06

Ah so you want to convert a java.io.OutputStream to a manifold stream instead?

dergutemoritz17:07:22

or what would the opposite direction be?

bpiel17:07:21

I'll elaborate... I want to connect a TCP port with a UNIX domain socket. I'm trying to use aleph/manifold and junixsocket

bpiel17:07:56

the former gives me manifold streams, the latter Input/OutputStreams It seems like I should be able to use manifold.streams/connect to do this (easily?) but I'm stuck not knowing how to turn the java I/OStreams into a manifold stream Does this make any sense?

bpiel17:07:29

maybe, lets' see...

dergutemoritz17:07:08

Hmm but it doesn't come with an implementation for your case it seems

bpiel17:07:38

yeah, I just tried and didn't work

bpiel17:07:56

I'm sure I can convert the manifold streams to java streams and then write something simple to move the bytes back and forth... it just seems like this is exactly what aleph/manifold would do easily, so I felt dumb doing it outside aleph/manifold

bpiel17:07:49

I'm supposed to take the kids to the pool now, so I need to go.. but thanks for your time and suggestions!

dergutemoritz17:07:36

Try (clj-commons.byte-streams/convert your-junixsocket-input-stream (clj-commons.byte-streams/stream-of bytes)) (and likewise with the output stream) and then s/connect them to your aleph stream

dergutemoritz17:07:20

just not sure if this actually gives you a continuous stream or whether it will block until the io stream closes

dergutemoritz17:07:26

FWIW, netty also comes with some support for Unix Domain Sockets: https://netty.io/4.1/api/io/netty/channel/unix/DomainSocketChannel.html

dergutemoritz17:07:51

Maybe using that will make your life a bit easier

dergutemoritz17:07:58

Ah, you should/could probably use s/splice rather than s/connect above

bpiel17:07:47

Awesome. I will try this all when I get back. Thanks!

🙏 1
kingmob06:08:55

Late to the party, did you get this sorted out @U4FFD43T4 ?

bpiel12:08:36

@U10EC98F5 I ended up doing it outside aleph, which is fine, but any advice would be appreciated. thanks

(future
  (try (loop []
         (let [bs @(s/take! s)]
           (when (-> bs count pos?)
             (.write fcos bs))
           (Thread/yield)
           (recur)))
       (catch Throwable ex
         ex)))
(I did something similar to handle the opposite direction)

bpiel12:08:11

...where fcos is an OutputStream

bpiel13:08:07

(also, thanks for maintaining aleph/manifold! They've been central to multiple projects I've done over the past ~5 years)

kingmob14:08:25

Ok, I'll take a deeper look tomorrow. But off the top of my head, I think dergutemoritz has the right idea.

bpiel01:08:35

The issue that I'm having with @U06GVE6NR suggestion above is that while this works:

(bys/convert (InputStream/nullInputStream)
                 (bys/stream-of bytes))
...this does not:
(bys/convert (OutputStream/nullOutputStream)
                 (bys/stream-of bytes))

Unhandled java.lang.IllegalArgumentException: Don't know how to convert class java.io.OutputStream$1 into (stream-of [B)

bpiel01:08:25

but, I may be misinterpreting "(and likewise with the output stream)"

kingmob14:08:49

@U4FFD43T4 OK, I’m a bit sleep-deprived, but my take is the discrepancy you’re seeing may be due to the difference in pull vs push stream semantics. Roughly speaking, Clojure seqs and http://java.io.Input/OutputStreams are pull-based, while Manifold is push-based. Manifold tries to automatically send things downstream, and stops when it can’t push (backpressure); in contrast, seqs/java.io.*Streams do nothing until you tell them to write or read data for you. It’s easy to make a pull-based source work in Manifold: stream take!s become derefs or .read()s of the underlying source as needed. Likewise for turning a Manifold stream into a seq/j.io.*Stream: backpressure forces manifold to slow down to the rate of seq/j.io.InputStream consumption (e.g., see manifold.stream/stream->seq). Sinks are a bit harder. Since Manifold is designed to automatically push values downstream as they appear, to turn that into a pull-based sink requires ferrying code to call .write()s, and potentially a buffer in-between (potentially unbounded) or a timeout. I don’t know Zach’s original rationale, but my guess is that the policy to go from a Manifold stream to an OutputStream might vary between users, so he didn’t want to prescribe what to do. E.g., yoke them, use a buffer, write to a temp file, drop excess mesgs, use background threads or not, etc? Or, it could be he just built it for his own purposes, which seems mostly about getting bytes in, not out. ¯\(ツ)/¯ Your solution to yoke the two together seems fine, btw (although Thread/yield seems sus). Since it only take!s as fast as it .write()s, backpressure forces the stream to slow down to the speed of the OutputStream, so that should be ok.

bpiel15:08:49

@U10EC98F5 Thank you for the thorough and thoughtful response!

bpiel16:07:35

I've tried ->sink and ->source but get exceptions