Fork me on GitHub
#aleph
<
2021-03-11
>
ABeltramo15:03:15

Hello everyone, @flowthing suggested me to post my question over here, I posted it in the clojure channel too, here it goes:

ABeltramo15:03:44

I'm using manifold and I can't wrap my head around splitting a stream: here's an example code:

(defn split-stream [condition stream]
  (let [pos-stream (ms/stream)
        neg-stream (ms/stream)]
    (ms/consume (fn [e]
                  (if (condition e)
                    (ms/put! pos-stream e)
                    (ms/put! neg-stream e)))
                stream)

    [(ms/source-only pos-stream)
     (ms/source-only neg-stream)]))

(deftest streamz
  (let [rand-stream (ms/->source (repeatedly #(- (rand) 0.5)))]
    (->> rand-stream
         (split-stream pos?)
         (apply ms/zip)
         ; ... Process pairs of [pos neg] ...
         (ms/stream->seq)
         (take 10)
         (println))))
A simple example that should capture my problem: I have a fn that generates values and put it into a stream. I need to process that stream in pairs based on a condition, can I split a stream into two streams? How can I accomplish that?

ABeltramo15:03:08

I tried something like

(defn split-stream [condition stream]
  [(ms/filter condition stream)
   (ms/filter (comp not condition) stream)])
but this seems to not work with a single stream. I guess I could have two separate input streams but that will mean to discard valid inputs

ABeltramo15:03:07

(defn split-stream [condition stream]
  (let [pos-stream (ms/stream)
        neg-stream (ms/stream)]
    (ms/connect-via stream
                    (fn [e]
                      (if (condition e)
                        (ms/put! pos-stream e)
                        (ms/put! neg-stream e)))
                    pos-stream)

    (ms/connect-via stream
                    (fn [e]
                      (if (condition e)
                        (ms/put! pos-stream e)
                        (ms/put! neg-stream e)))
                    neg-stream)

    [(ms/source-only pos-stream)
     (ms/source-only neg-stream)]))
Doing something like this seems to result in an infinite waiting. I guess you can't connect two stream to a single stream?

lilactown15:03:46

when I limit the repeatedly call to 10 elements above, it seems to work

ABeltramo15:03:10

Which split-stream does work with the 10 elements limit for you?

lilactown15:03:44

the first one you pasted in the channel

lilactown15:03:58

also just return a vector with [(ms/filter ,,,) (ms/filter ,,,)]

ABeltramo15:03:45

the first one should work because I'm forcing the consume but it doesn't seem like a good solution (since there's no end to the generator)

Matthew Davidson (kingmob)22:03:43

Bit late to the party, but consume isn’t what you want in the middle of a stream. It’s more a terminal thing.

lilactown15:03:57

going to 1000 seems to lock up my REPL

lilactown15:03:02

so this seems to be an issue trying to chunk the stream of values

ABeltramo15:03:19

Maybe I should take a different approach and making a transducer to be used with transform, I don't really need two streams, I need pairs.

lilactown15:03:50

I guess I just don't see where the issue is yet.

ABeltramo16:03:32

Don't worry, it's probably me, I'm very new to Manifold! Thing is that I would like to split a stream into two separate streams based on a condition. I would like to generate stuff once and put it in the right stream, at the same time, I would like to generate only when someone needs to take downstream from the two child streams.

Matthew Davidson (kingmob)22:03:03

“I would like to generate only when someone needs to take  downstream from the two child streams.” This is a bit antithetical to how Manifold streams work. Manifold streams (and most stream libs) are push-based. A value enters the top of the stream, and a chain of callbacks and/or continuations propagates it automatically. What you describe is closer to standard lazy seqs, or the reactive-streams specification (e.g., java.util.concurrent.Flow), which are pull-based.

Matthew Davidson (kingmob)22:03:39

(That being said, a seq source wrapped around an infinite seq like repeatedly will pull values on demand)

lilactown16:03:48

so the examples I've been working through, eventually my REPL locks up trying to run them

lilactown16:03:55

I guess the problem is that the producer that connects the streams together needs to buffer the values

ABeltramo16:03:04

(deftest streamz
  (let [rand-stream-1 (ms/->source (repeatedly  #(- (rand) 0.5)))
        rand-stream-2 (ms/->source (repeatedly  #(- (rand) 0.5)))]
    (->> (ms/zip (ms/filter pos? rand-stream-1)
                 (ms/filter neg? rand-stream-2))
         ; ... Process pairs of [pos neg] ...
         (ms/stream->seq)
         (take 10)
         (println))))
For example this works fine, but in my use case generating is an expensive operation and I would really prefer to not waste generated stuff using filter.

lilactown16:03:14

(require '[manifold.deferred :as m]
         '[manifold.stream :as ms])

(defn split-stream [condition stream]
  [(ms/buffer 100 (ms/filter condition stream))
   (ms/buffer 100 (ms/filter (comp not condition) stream))])

(comment
  (let [rand-stream (->> (repeatedly 100 #(- (rand) 0.5))
                         (ms/->source))]
    (->> rand-stream
         (split-stream pos?)
         (apply ms/zip)
         ;; ... Process pairs of [pos neg] ...
         (ms/consume
          (fn [[a b]]
            (prn a b)))))
  )

lilactown16:03:24

this appears to work for me

ABeltramo16:03:40

Thanks, I'll try it out. Just by looking at it, is it working because you produce 100 elements and buffers have the same size? Because I don't know how much elements I have to produce in advance, I would like to be something that comes downstream, hence the take at the bottom.

lilactown16:03:04

yes, I think the problem is that the zip will repeatedly take! from each stream, and if you end up with a stream that doesn't alternative between the two conditions, you end up in a deadlock

ABeltramo16:03:01

So, for example, the issue will be when you generate batch-size elements that are all positive, am I right?

lilactown16:03:26

or just something like:

(let [rand-stream (->> [1 2 3 -4 1 2 3 -4]
                         (ms/->source))]
   ,,,)

ABeltramo16:03:22

I see, but I have an infinite generator of numbers! 😅

ABeltramo16:03:16

I think I'll go with the two generator streams and filter that discards elements. If anyone have a better solution, please ping me!

lilactown16:03:31

yeah, the issue is not the total number of positive or negative but the fact that you might end up needing to put! multiple positive or negative numbers in a row

lilactown16:03:22

in this example, without buffering, the producer blocks trying to put! the 2 value on the filter pos? stream until it's consumed, and the zip consumer won't take! from the filter pos? stream until it can take! a value from the filter not pos? stream

👆 3
ABeltramo16:03:20

Thank you @lilactown for your help, I got a better understanding of the issue now!

lilactown16:03:46

perhaps you could do the filtering before creating the stream? I don't really know what this looks like in your application

lilactown16:03:27

ex. this works:

(let [rand-seq (repeatedly #(- (rand) 0.5))
        pos (ms/->source (filter pos? rand-seq))
        neg (ms/->source (filter neg? rand-seq))]
    (->> (ms/zip pos neg)
         ;; ... Process pairs of [pos neg] ...
         (ms/stream->seq)
         (take 10)
         prn))

lilactown16:03:34

since you said that the generation is expensive, this wraps it in a single sequence but then creates two different producers on top of it

ABeltramo16:03:07

Ah! You might be right!

ABeltramo16:03:22

So instead of doing the filtering inside the stream I generate two lazy lazy seqs from the original sequence.

lilactown16:03:16

I think the key insight is to create two sources based on the original sequence, I don't think it matters whether you do the filtering as a seq or a stream

lilactown16:03:22

(let [rand-seq (repeatedly #(- (rand) 0.5))
        pos (->> (ms/->source rand-seq)
                 (ms/filter pos?))
        neg (->> (ms/->source rand-seq)
                 (ms/filter neg?))]
    (->> (ms/zip pos neg)
         ;; ... Process pairs of [pos neg] ...
         (ms/stream->seq)
         (take 10)
         prn))

ABeltramo16:03:22

Right right I see.. And this will not call rand-seq twice?

lilactown16:03:29

I can't imagine how it would

🎉 3
lilactown16:03:14

that would break a lot of assumptions I have about lazy seqs

ABeltramo17:03:44

I'm sorry, I need some hammock time to think about it but it really seems the solution I was looking for. @lilactown thank you so much for the help!

3
lilactown17:03:47

you're welcome!

lilactown17:03:33

I learned a bunch as well 😄 tbh I only have a surface knowledge of manifold

lilactown17:03:37

interestingly this is related to a problem I've been thinking about in asynchronous programming. would it be okay if I took the example we've been talking about here and used it in a blog post?

ABeltramo17:03:28

You sure can! There's nothing confidential in the example I made.

pablore17:03:21

@beltramo.ale You tried using transform with (juxt filter remove)?

ABeltramo17:03:29

I tried and failed, but this was in the middle of trying out @lilactown solutions so I might have missed something..

ABeltramo17:03:14

I saw that usage of juxt when I was looking at a transducer to be used and I stumble upon the separate fn.

pablore17:03:48

(juxt filter remove) solves your problem for sequences