This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2017-06-07
Channels
- # aleph (19)
- # aws (1)
- # beginners (75)
- # boot (28)
- # cider (1)
- # cljs-dev (12)
- # cljsrn (20)
- # clojure (350)
- # clojure-argentina (1)
- # clojure-chicago (2)
- # clojure-dev (2)
- # clojure-russia (5)
- # clojure-spec (2)
- # clojure-uk (14)
- # clojure-ukraine (3)
- # clojurescript (68)
- # component (87)
- # core-async (25)
- # core-logic (13)
- # cursive (4)
- # data-science (72)
- # datascript (59)
- # datomic (15)
- # defnpodcast (7)
- # emacs (33)
- # hoplon (5)
- # immutant (73)
- # jobs (21)
- # klipse (6)
- # lumo (14)
- # off-topic (26)
- # om (23)
- # onyx (6)
- # parinfer (37)
- # protorepl (4)
- # re-frame (13)
- # ring (2)
- # rum (3)
- # spacemacs (2)
- # specter (22)
- # sql (47)
- # uncomplicate (10)
- # unrepl (79)
- # untangled (66)
- # vim (47)
- # yada (17)
Ugh. Bumped into another thing in #aleph I do not understand. While — thanks to @dm3 — I could run manifold.deferred/future
s in a custom thread pool, I now struggle to do the same via manifold.stream
. I am using stream.onto
to “assign” the same thread pool to the stream. But the execution is serial.
What I was expecting: the put!
s finish immediately, and after one second I get 10 “Received:” lines at once. What I get instead: The put!
s finish immediately, and every second I get a “Received:” line, taking 10 seconds in total.
From https://github.com/ztellman/manifold/blob/master/docs/execution.md I understand that I do not need to put the sleep
function into a future (as in my previous try with manifold.deferred
).
@hansen-pansen you are supposed to respect backpressure when put!
ting into a stream, i.e.
(dotimes [_ num] @(s/put! x delay))
. Due to the stream x
having a default buffer of size 1, the items will be put!
and processed by the sleep
function one at a time.
the stream callbacks will run on the supplied pool
executor, but the operations on the stream will not be parallelized automatically
@dm3 Well, I tried to use a buffered-stream
, too, but same result.
If I would have all the inputs from the beginning, I would use ->source
or put-all!
instead. But I am trying to use manifold
for a queue/worker model, were inputs are generated from web requests.
@hansen-pansen I believe what @dm3 is saying is that you will need to initialize x
with a buffer, i.e. bind it to (s/stream 10)
as opposed to (s/stream)
(which only has a buffer for one item)
so they will not attempt to process additional items when they are blocking on the number of deferreds in the buffer
see also https://github.com/ztellman/manifold/blob/master/docs/stream.md#buffers-and-backpressure
@ehashman This is what I also tried, but it's not depicted in above example. I used x (stream/buffered-stream num)
, so all put!
s should should “get through”.
Also: my put!
s did not block.
BAAM This could be the thing!