This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2022-07-30
Channels
- # aleph (39)
- # announcements (5)
- # babashka (7)
- # beginners (14)
- # biff (1)
- # clj-kondo (7)
- # clojure (38)
- # clojure-chicago (3)
- # clojure-europe (3)
- # clojure-norway (1)
- # clojurescript (8)
- # cursive (17)
- # data-science (6)
- # defnpodcast (3)
- # emacs (4)
- # figwheel-main (1)
- # honeysql (2)
- # hyperfiddle (2)
- # malli (20)
- # missionary (24)
- # off-topic (27)
- # reagent (4)
- # scittle (11)
- # shadow-cljs (51)
- # spacemacs (1)
- # xtdb (2)
Hi. https://github.com/clj-commons/aleph states that: "Aleph exposes data from the network as a https://github.com/clj-commons/manifold stream, which can easily be transformed into a java.io.InputStream
"
Can anyone tell me how to transform a manifold stream into an InputStream (or an OutputStream)?
I've worked with aleph
and manifold
many times over the years, but it seems like I've never done this. I've read through docs and googled, so I suspect I'm missing something obvious. thanks
https://github.com/clj-commons/byte-streams would be the idiomatic choice (in the context of aleph at least :))
@U06GVE6NR I should have mentioned that I've tried that lib out as well, but still have not succeeded
Oh! How did you try it?
ok, by "try" I guess I meant that I tried, but failed, to determine (from the docs) which function would be of use here
Try to-input-stream
Too obvious? 😄
but yeah, there was definitely a mismatch between what I was reading/saying and what I wanted to do
Ah so you want to convert a java.io.OutputStream
to a manifold stream instead?
or what would the opposite direction be?
I'll elaborate... I want to connect a TCP port with a UNIX domain socket. I'm trying to use aleph/manifold and junixsocket
the former gives me manifold streams, the latter Input/OutputStreams
It seems like I should be able to use manifold.streams/connect
to do this (easily?) but I'm stuck not knowing how to turn the java I/OStreams into a manifold stream
Does this make any sense?
for reference: https://github.com/kohlschutter/junixsocket
Ah! Perhaps https://github.com/clj-commons/byte-streams#transfers is what you're looking for then?
Hmm but it doesn't come with an implementation for your case it seems
I'm sure I can convert the manifold streams to java streams and then write something simple to move the bytes back and forth... it just seems like this is exactly what aleph/manifold would do easily, so I felt dumb doing it outside aleph/manifold
I'm supposed to take the kids to the pool now, so I need to go.. but thanks for your time and suggestions!
Try (clj-commons.byte-streams/convert your-junixsocket-input-stream (clj-commons.byte-streams/stream-of bytes))
(and likewise with the output stream) and then s/connect
them to your aleph stream
just not sure if this actually gives you a continuous stream or whether it will block until the io stream closes
FWIW, netty also comes with some support for Unix Domain Sockets: https://netty.io/4.1/api/io/netty/channel/unix/DomainSocketChannel.html
Maybe using that will make your life a bit easier
Ah, you should/could probably use s/splice
rather than s/connect
above
Late to the party, did you get this sorted out @U4FFD43T4 ?
@U10EC98F5 I ended up doing it outside aleph, which is fine, but any advice would be appreciated. thanks
(future
(try (loop []
(let [bs @(s/take! s)]
(when (-> bs count pos?)
(.write fcos bs))
(Thread/yield)
(recur)))
(catch Throwable ex
ex)))
(I did something similar to handle the opposite direction)(also, thanks for maintaining aleph/manifold! They've been central to multiple projects I've done over the past ~5 years)
Ok, I'll take a deeper look tomorrow. But off the top of my head, I think dergutemoritz has the right idea.
The issue that I'm having with @U06GVE6NR suggestion above is that while this works:
(bys/convert (InputStream/nullInputStream)
(bys/stream-of bytes))
...this does not:
(bys/convert (OutputStream/nullOutputStream)
(bys/stream-of bytes))
Unhandled java.lang.IllegalArgumentException: Don't know how to convert class java.io.OutputStream$1 into (stream-of [B)
@U4FFD43T4 OK, I’m a bit sleep-deprived, but my take is the discrepancy you’re seeing may be due to the difference in pull vs push stream semantics. Roughly speaking, Clojure seqs and http://java.io.Input/OutputStreams are pull-based, while Manifold is push-based. Manifold tries to automatically send things downstream, and stops when it can’t push (backpressure); in contrast, seqs/java.io.*Streams do nothing until you tell them to write or read data for you.
It’s easy to make a pull-based source work in Manifold: stream take!s become derefs or .read()s of the underlying source as needed. Likewise for turning a Manifold stream into a seq/j.io.*Stream: backpressure forces manifold to slow down to the rate of seq/j.io.InputStream consumption (e.g., see manifold.stream/stream->seq
).
Sinks are a bit harder. Since Manifold is designed to automatically push values downstream as they appear, to turn that into a pull-based sink requires ferrying code to call .write()s, and potentially a buffer in-between (potentially unbounded) or a timeout.
I don’t know Zach’s original rationale, but my guess is that the policy to go from a Manifold stream to an OutputStream might vary between users, so he didn’t want to prescribe what to do. E.g., yoke them, use a buffer, write to a temp file, drop excess mesgs, use background threads or not, etc? Or, it could be he just built it for his own purposes, which seems mostly about getting bytes in, not out. ¯\(ツ)/¯
Your solution to yoke the two together seems fine, btw (although Thread/yield seems sus). Since it only take!s as fast as it .write()s, backpressure forces the stream to slow down to the speed of the OutputStream, so that should be ok.
@U10EC98F5 Thank you for the thorough and thoughtful response!