Fork me on GitHub
#aleph
<
2017-06-07
>
hansen-pansen17:06:29

Ugh. Bumped into another thing in #aleph I do not understand. While — thanks to @dm3 — I could run manifold.deferred/futures in a custom thread pool, I now struggle to do the same via manifold.stream. I am using stream.onto to “assign” the same thread pool to the stream. But the execution is serial.

hansen-pansen17:06:44

What I was expecting: the put!s finish immediately, and after one second I get 10 “Received:” lines at once. What I get instead: The put!s finish immediately, and every second I get a “Received:” line, taking 10 seconds in total. From https://github.com/ztellman/manifold/blob/master/docs/execution.md I understand that I do not need to put the sleep function into a future (as in my previous try with manifold.deferred).

dm317:06:45

@hansen-pansen you are supposed to respect backpressure when put!ting into a stream, i.e. (dotimes [_ num] @(s/put! x delay)). Due to the stream x having a default buffer of size 1, the items will be put! and processed by the sleep function one at a time.

dm317:06:59

there’s also put-all! for putting multiple items

dm317:06:44

the stream callbacks will run on the supplied pool executor, but the operations on the stream will not be parallelized automatically

hansen-pansen17:06:24

@dm3 Well, I tried to use a buffered-stream, too, but same result. If I would have all the inputs from the beginning, I would use ->source or put-all! instead. But I am trying to use manifold for a queue/worker model, were inputs are generated from web requests.

dm317:06:06

that’s great

dm317:06:19

producer puts

dm317:06:21

consumer takes

dm317:06:47

buffer size is the amount of requests depending on the max allowed latency

ehashman18:06:14

@hansen-pansen I believe what @dm3 is saying is that you will need to initialize x with a buffer, i.e. bind it to (s/stream 10) as opposed to (s/stream) (which only has a buffer for one item)

ehashman18:06:32

streams automatically deal with backpressure

ehashman18:06:58

so they will not attempt to process additional items when they are blocking on the number of deferreds in the buffer

ehashman18:06:08

if the buffer is size 1, it will block on that one item

hansen-pansen18:06:44

@ehashman This is what I also tried, but it's not depicted in above example. I used x (stream/buffered-stream num), so all put!s should should “get through”.

hansen-pansen18:06:38

Also: my put!s did not block.

dm318:06:57

you have to deref the put!

hansen-pansen18:06:39

BAAM This could be the thing!