aleph

bpiel 2022-07-30T16:57:34.519789Z

Hi. https://github.com/clj-commons/aleph states that: "Aleph exposes data from the network as a https://github.com/clj-commons/manifold stream, which can easily be transformed into a java.io.InputStream" Can anyone tell me how to transform a manifold stream into an InputStream (or an OutputStream)? I've worked with aleph and manifold many times over the years, but it seems like I've never done this. I've read through docs and googled, so I suspect I'm missing something obvious. thanks

Matthew Davidson 2022-08-01T06:41:55.483199Z

Late to the party, did you get this sorted out @bpiel ?

bpiel 2022-08-01T12:38:36.750949Z

@kingmob I ended up doing it outside aleph, which is fine, but any advice would be appreciated. thanks

(future
  (try (loop []
         (let [bs @(s/take! s)]
           (when (-> bs count pos?)
             (.write fcos bs))
           (Thread/yield)
           (recur)))
       (catch Throwable ex
         ex)))
(I did something similar to handle the opposite direction)

bpiel 2022-08-01T12:39:11.582679Z

...where fcos is an OutputStream

bpiel 2022-08-01T13:10:07.878319Z

(also, thanks for maintaining aleph/manifold! They've been central to multiple projects I've done over the past ~5 years)

Matthew Davidson 2022-08-01T14:05:25.693409Z

Ok, I'll take a deeper look tomorrow. But off the top of my head, I think dergutemoritz has the right idea.

bpiel 2022-08-02T01:51:35.188669Z

The issue that I'm having with @dergutemoritz suggestion above is that while this works:

(bys/convert (InputStream/nullInputStream)
                 (bys/stream-of bytes))
...this does not:
(bys/convert (OutputStream/nullOutputStream)
                 (bys/stream-of bytes))

Unhandled java.lang.IllegalArgumentException: Don't know how to convert class java.io.OutputStream$1 into (stream-of [B)

bpiel 2022-08-02T01:52:25.723279Z

but, I may be misinterpreting "(and likewise with the output stream)"

Matthew Davidson 2022-08-02T14:44:49.031679Z

@bpiel OK, I’m a bit sleep-deprived, but my take is the discrepancy you’re seeing may be due to the difference in pull vs push stream semantics. Roughly speaking, Clojure seqs and http://java.io.Input/OutputStreams are pull-based, while Manifold is push-based. Manifold tries to automatically send things downstream, and stops when it can’t push (backpressure); in contrast, seqs/java.io.*Streams do nothing until you tell them to write or read data for you. It’s easy to make a pull-based source work in Manifold: stream take!s become derefs or .read()s of the underlying source as needed. Likewise for turning a Manifold stream into a seq/j.io.*Stream: backpressure forces manifold to slow down to the rate of seq/j.io.InputStream consumption (e.g., see manifold.stream/stream->seq). Sinks are a bit harder. Since Manifold is designed to automatically push values downstream as they appear, to turn that into a pull-based sink requires ferrying code to call .write()s, and potentially a buffer in-between (potentially unbounded) or a timeout. I don’t know Zach’s original rationale, but my guess is that the policy to go from a Manifold stream to an OutputStream might vary between users, so he didn’t want to prescribe what to do. E.g., yoke them, use a buffer, write to a temp file, drop excess mesgs, use background threads or not, etc? Or, it could be he just built it for his own purposes, which seems mostly about getting bytes in, not out. ¯\(ツ)/¯ Your solution to yoke the two together seems fine, btw (although Thread/yield seems sus). Since it only take!s as fast as it .write()s, backpressure forces the stream to slow down to the speed of the OutputStream, so that should be ok.

bpiel 2022-08-02T15:08:49.155289Z

@kingmob Thank you for the thorough and thoughtful response!

dergutemoritz 2022-07-30T17:02:37.967939Z

https://github.com/clj-commons/byte-streams would be the idiomatic choice (in the context of aleph at least :))

bpiel 2022-07-30T17:03:45.273159Z

@dergutemoritz I should have mentioned that I've tried that lib out as well, but still have not succeeded

dergutemoritz 2022-07-30T17:03:55.918529Z

Oh! How did you try it?

bpiel 2022-07-30T17:04:01.807159Z

Let's see

bpiel 2022-07-30T17:05:04.013189Z

ok, by "try" I guess I meant that I tried, but failed, to determine (from the docs) which function would be of use here

dergutemoritz 2022-07-30T17:05:23.859649Z

Try to-input-stream

bpiel 2022-07-30T17:05:55.541729Z

haha...........

dergutemoritz 2022-07-30T17:06:16.033569Z

Too obvious? 😄

bpiel 2022-07-30T17:06:17.816709Z

hmmm... ok, so apparently I did do that already

bpiel 2022-07-30T17:06:34.599719Z

which means that I guess I meant to ask how to go the opposite direction

bpiel 2022-07-30T17:07:03.105209Z

but yeah, there was definitely a mismatch between what I was reading/saying and what I wanted to do

dergutemoritz 2022-07-30T17:08:06.833299Z

Ah so you want to convert a java.io.OutputStream to a manifold stream instead?

dergutemoritz 2022-07-30T17:08:22.439279Z

or what would the opposite direction be?

bpiel 2022-07-30T17:09:21.301949Z

I'll elaborate... I want to connect a TCP port with a UNIX domain socket. I'm trying to use aleph/manifold and junixsocket

bpiel 2022-07-30T17:10:56.441729Z

the former gives me manifold streams, the latter Input/OutputStreams It seems like I should be able to use manifold.streams/connect to do this (easily?) but I'm stuck not knowing how to turn the java I/OStreams into a manifold stream Does this make any sense?

bpiel 2022-07-30T17:11:54.299229Z

for reference: https://github.com/kohlschutter/junixsocket

dergutemoritz 2022-07-30T17:14:36.596499Z

Ah! Perhaps https://github.com/clj-commons/byte-streams#transfers is what you're looking for then?

bpiel 2022-07-30T17:15:29.472239Z

maybe, lets' see...

dergutemoritz 2022-07-30T17:16:08.071959Z

Hmm but it doesn't come with an implementation for your case it seems

bpiel 2022-07-30T17:16:38.675829Z

yeah, I just tried and didn't work

bpiel 2022-07-30T17:17:56.040439Z

I'm sure I can convert the manifold streams to java streams and then write something simple to move the bytes back and forth... it just seems like this is exactly what aleph/manifold would do easily, so I felt dumb doing it outside aleph/manifold

bpiel 2022-07-30T17:20:49.574009Z

I'm supposed to take the kids to the pool now, so I need to go.. but thanks for your time and suggestions!

dergutemoritz 2022-07-30T17:21:36.472299Z

Try (clj-commons.byte-streams/convert your-junixsocket-input-stream (clj-commons.byte-streams/stream-of bytes)) (and likewise with the output stream) and then s/connect them to your aleph stream

dergutemoritz 2022-07-30T17:22:20.926179Z

just not sure if this actually gives you a continuous stream or whether it will block until the io stream closes

dergutemoritz 2022-07-30T17:24:26.580539Z

FWIW, netty also comes with some support for Unix Domain Sockets: https://netty.io/4.1/api/io/netty/channel/unix/DomainSocketChannel.html

dergutemoritz 2022-07-30T17:24:51.286509Z

Maybe using that will make your life a bit easier

dergutemoritz 2022-07-30T17:28:58.439729Z

Ah, you should/could probably use s/splice rather than s/connect above

bpiel 2022-07-30T17:30:47.254679Z

Awesome. I will try this all when I get back. Thanks!

🙏 1
bpiel 2022-07-30T16:58:35.675519Z

I've tried ->sink and ->source but get exceptions