This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
- # aleph (39)
- # announcements (5)
- # babashka (7)
- # beginners (14)
- # biff (1)
- # clj-kondo (7)
- # clojure (38)
- # clojure-chicago (3)
- # clojure-europe (3)
- # clojure-norway (1)
- # clojurescript (8)
- # cursive (17)
- # data-science (6)
- # defnpodcast (3)
- # emacs (4)
- # figwheel-main (1)
- # honeysql (2)
- # hyperfiddle (2)
- # malli (20)
- # missionary (24)
- # off-topic (27)
- # reagent (4)
- # scittle (11)
- # shadow-cljs (51)
- # spacemacs (1)
- # xtdb (2)
Hi. https://github.com/clj-commons/aleph states that: "Aleph exposes data from the network as a https://github.com/clj-commons/manifold stream, which can easily be transformed into a
Can anyone tell me how to transform a manifold stream into an InputStream (or an OutputStream)?
I've worked with
manifold many times over the years, but it seems like I've never done this. I've read through docs and googled, so I suspect I'm missing something obvious. thanks
https://github.com/clj-commons/byte-streams would be the idiomatic choice (in the context of aleph at least :))
@U06GVE6NR I should have mentioned that I've tried that lib out as well, but still have not succeeded
ok, by "try" I guess I meant that I tried, but failed, to determine (from the docs) which function would be of use here
but yeah, there was definitely a mismatch between what I was reading/saying and what I wanted to do
I'll elaborate... I want to connect a TCP port with a UNIX domain socket. I'm trying to use aleph/manifold and junixsocket
the former gives me manifold streams, the latter Input/OutputStreams
It seems like I should be able to use
manifold.streams/connect to do this (easily?) but I'm stuck not knowing how to turn the java I/OStreams into a manifold stream
Does this make any sense?
Ah! Perhaps https://github.com/clj-commons/byte-streams#transfers is what you're looking for then?
I'm sure I can convert the manifold streams to java streams and then write something simple to move the bytes back and forth... it just seems like this is exactly what aleph/manifold would do easily, so I felt dumb doing it outside aleph/manifold
I'm supposed to take the kids to the pool now, so I need to go.. but thanks for your time and suggestions!
(clj-commons.byte-streams/convert your-junixsocket-input-stream (clj-commons.byte-streams/stream-of bytes)) (and likewise with the output stream) and then
s/connect them to your aleph stream
just not sure if this actually gives you a continuous stream or whether it will block until the io stream closes
FWIW, netty also comes with some support for Unix Domain Sockets: https://netty.io/4.1/api/io/netty/channel/unix/DomainSocketChannel.html
@U10EC98F5 I ended up doing it outside aleph, which is fine, but any advice would be appreciated. thanks
(I did something similar to handle the opposite direction)
(future (try (loop  (let [bs @(s/take! s)] (when (-> bs count pos?) (.write fcos bs)) (Thread/yield) (recur))) (catch Throwable ex ex)))
(also, thanks for maintaining aleph/manifold! They've been central to multiple projects I've done over the past ~5 years)
Ok, I'll take a deeper look tomorrow. But off the top of my head, I think dergutemoritz has the right idea.
The issue that I'm having with @U06GVE6NR suggestion above is that while this works:
...this does not:
(bys/convert (InputStream/nullInputStream) (bys/stream-of bytes))
(bys/convert (OutputStream/nullOutputStream) (bys/stream-of bytes)) Unhandled java.lang.IllegalArgumentException: Don't know how to convert class java.io.OutputStream$1 into (stream-of [B)
@U4FFD43T4 OK, I’m a bit sleep-deprived, but my take is the discrepancy you’re seeing may be due to the difference in pull vs push stream semantics. Roughly speaking, Clojure seqs and http://java.io.Input/OutputStreams are pull-based, while Manifold is push-based. Manifold tries to automatically send things downstream, and stops when it can’t push (backpressure); in contrast, seqs/java.io.*Streams do nothing until you tell them to write or read data for you.
It’s easy to make a pull-based source work in Manifold: stream take!s become derefs or .read()s of the underlying source as needed. Likewise for turning a Manifold stream into a seq/j.io.*Stream: backpressure forces manifold to slow down to the rate of seq/j.io.InputStream consumption (e.g., see
Sinks are a bit harder. Since Manifold is designed to automatically push values downstream as they appear, to turn that into a pull-based sink requires ferrying code to call .write()s, and potentially a buffer in-between (potentially unbounded) or a timeout.
I don’t know Zach’s original rationale, but my guess is that the policy to go from a Manifold stream to an OutputStream might vary between users, so he didn’t want to prescribe what to do. E.g., yoke them, use a buffer, write to a temp file, drop excess mesgs, use background threads or not, etc? Or, it could be he just built it for his own purposes, which seems mostly about getting bytes in, not out. ¯\(ツ)/¯
Your solution to yoke the two together seems fine, btw (although Thread/yield seems sus). Since it only take!s as fast as it .write()s, backpressure forces the stream to slow down to the speed of the OutputStream, so that should be ok.