Fork me on GitHub

Hi. states that: "Aleph exposes data from the network as a stream, which can easily be transformed into a" Can anyone tell me how to transform a manifold stream into an InputStream (or an OutputStream)? I've worked with aleph and manifold many times over the years, but it seems like I've never done this. I've read through docs and googled, so I suspect I'm missing something obvious. thanks

dergutemoritz17:07:37 would be the idiomatic choice (in the context of aleph at least :))


@U06GVE6NR I should have mentioned that I've tried that lib out as well, but still have not succeeded


Oh! How did you try it?


Let's see


ok, by "try" I guess I meant that I tried, but failed, to determine (from the docs) which function would be of use here


Try to-input-stream




Too obvious? 😄


hmmm... ok, so apparently I did do that already


which means that I guess I meant to ask how to go the opposite direction


but yeah, there was definitely a mismatch between what I was reading/saying and what I wanted to do


Ah so you want to convert a to a manifold stream instead?


or what would the opposite direction be?


I'll elaborate... I want to connect a TCP port with a UNIX domain socket. I'm trying to use aleph/manifold and junixsocket


the former gives me manifold streams, the latter Input/OutputStreams It seems like I should be able to use manifold.streams/connect to do this (easily?) but I'm stuck not knowing how to turn the java I/OStreams into a manifold stream Does this make any sense?


maybe, lets' see...


Hmm but it doesn't come with an implementation for your case it seems


yeah, I just tried and didn't work


I'm sure I can convert the manifold streams to java streams and then write something simple to move the bytes back and forth... it just seems like this is exactly what aleph/manifold would do easily, so I felt dumb doing it outside aleph/manifold


I'm supposed to take the kids to the pool now, so I need to go.. but thanks for your time and suggestions!


Try (clj-commons.byte-streams/convert your-junixsocket-input-stream (clj-commons.byte-streams/stream-of bytes)) (and likewise with the output stream) and then s/connect them to your aleph stream


just not sure if this actually gives you a continuous stream or whether it will block until the io stream closes


FWIW, netty also comes with some support for Unix Domain Sockets:


Maybe using that will make your life a bit easier


Ah, you should/could probably use s/splice rather than s/connect above


Awesome. I will try this all when I get back. Thanks!

🙏 1
Matthew Davidson (kingmob)06:08:55

Late to the party, did you get this sorted out @U4FFD43T4 ?


@U10EC98F5 I ended up doing it outside aleph, which is fine, but any advice would be appreciated. thanks

  (try (loop []
         (let [bs @(s/take! s)]
           (when (-> bs count pos?)
             (.write fcos bs))
       (catch Throwable ex
(I did something similar to handle the opposite direction)


...where fcos is an OutputStream


(also, thanks for maintaining aleph/manifold! They've been central to multiple projects I've done over the past ~5 years)

Matthew Davidson (kingmob)14:08:25

Ok, I'll take a deeper look tomorrow. But off the top of my head, I think dergutemoritz has the right idea.


The issue that I'm having with @U06GVE6NR suggestion above is that while this works:

(bys/convert (InputStream/nullInputStream)
                 (bys/stream-of bytes))
...this does not:
(bys/convert (OutputStream/nullOutputStream)
                 (bys/stream-of bytes))

Unhandled java.lang.IllegalArgumentException: Don't know how to convert class$1 into (stream-of [B)


but, I may be misinterpreting "(and likewise with the output stream)"

Matthew Davidson (kingmob)14:08:49

@U4FFD43T4 OK, I’m a bit sleep-deprived, but my take is the discrepancy you’re seeing may be due to the difference in pull vs push stream semantics. Roughly speaking, Clojure seqs and are pull-based, while Manifold is push-based. Manifold tries to automatically send things downstream, and stops when it can’t push (backpressure); in contrast, seqs/*Streams do nothing until you tell them to write or read data for you. It’s easy to make a pull-based source work in Manifold: stream take!s become derefs or .read()s of the underlying source as needed. Likewise for turning a Manifold stream into a seq/*Stream: backpressure forces manifold to slow down to the rate of seq/ consumption (e.g., see>seq). Sinks are a bit harder. Since Manifold is designed to automatically push values downstream as they appear, to turn that into a pull-based sink requires ferrying code to call .write()s, and potentially a buffer in-between (potentially unbounded) or a timeout. I don’t know Zach’s original rationale, but my guess is that the policy to go from a Manifold stream to an OutputStream might vary between users, so he didn’t want to prescribe what to do. E.g., yoke them, use a buffer, write to a temp file, drop excess mesgs, use background threads or not, etc? Or, it could be he just built it for his own purposes, which seems mostly about getting bytes in, not out. ¯\(ツ)/¯ Your solution to yoke the two together seems fine, btw (although Thread/yield seems sus). Since it only take!s as fast as it .write()s, backpressure forces the stream to slow down to the speed of the OutputStream, so that should be ok.


@U10EC98F5 Thank you for the thorough and thoughtful response!


I've tried ->sink and ->source but get exceptions