Fork me on GitHub
#aleph
<
2017-06-08
>
hansen-pansen08:06:04

@dm3 @ehashman Thank you very much for the hints and your continuous help! I again read the mentioned link (which, together with the codox documentation and the repository doc folder), is basically the only documentation I know of), and adapted my example with the (s/stream 10). But it doesn’t change the result, and I believe I do not understand the concept, when and where callbacks are performed. I suspect that my sleep function does not get assigned to the pool thread pool via stream/map, or stream/consume plays a role.

dm309:06:44

can you paste your whole program now?

hansen-pansen09:06:38

Sure, thank you.

dm309:06:18

and what do you expect should happen in the above?

hansen-pansen09:06:46

1. Sending … and … finished. appear immediately (i.e., a batch of put!s do not block) 2. After one second, I see 10 Slept 1000 ms

hansen-pansen09:06:02

I suspect the stream/consume blocks the pool.

dm309:06:26

hm, it works as I’d expect locally (with 0.1.7-alpha)

dm309:06:10

puts return immediately when x is buffered - (s/stream 10)

dm309:06:23

then sleep gets called every second 10 times

hansen-pansen09:06:42

Well, expectation 1 was met 🙂

hansen-pansen09:06:49

I’ll try with the alpha right now…

hansen-pansen09:06:56

Nope, not for me. Clojure 1.8 or 1.9 alpha?

dm309:06:09

boot.user=> (def a (s/stream 10))
#'boot.user/a
boot.user=> (def pool (manifold.executor/fixed-thread-executor 10 {:initial-thread-count 10}))
#'boot.user/pool
boot.user=> (s/consume #(println "Got" %) (s/map sleep (s/onto pool a)))
<< … >>
boot.user=> (do (println "Started" (java.util.Date.)) (dotimes [_ 10] @(s/put! a 1000)) (println "Done" (java.util.Date.)))

dm309:06:24

I mean manifold 0.1.7-alpha5

hansen-pansen09:06:32

I know. I tried manifold 0.1.7 alpha, but see “my” behaviour. That’s why I also asked for the Clojure version. I will try with your version. Right now, I am a leiningen, no boot, user. Just a sec.

dm309:06:46

clojure/lein/boot don’t matter

hansen-pansen09:06:11

I know / hope. But I want to try your exact snippet.

dm309:06:11

also the behaviour should be consistent between Manifold 0.1.5-0.1.7

hansen-pansen09:06:32

I try it out, I need to head out for lunch. Sorry, this is not very respectful from me.

dm310:06:45

no problem 🙂 it’s asynchronous

hansen-pansen12:06:29

😂 Yes, that’s truly asynchronous.

hansen-pansen12:06:10

I think I should re-phrase expectation 2: I expect that after 1 second I see all 10 Got or Slept … at once, as those 10 sleeps get executed in parallel.

hansen-pansen12:06:23

Instead, I see every second a Got and Slept …, which in total takes 10 seconds, as they run sequentially.

dm312:06:39

yes, that’s how the stream is consumed - sequentially

dm312:06:07

there’s no parallel-consume in manifold

hansen-pansen12:06:45

My assumption was that when the stream is consumed, the sleep get’s pushed to the worker pool, immediately fetching the next, putting in the pool etc.

dm312:06:16

no, although I can see how that would be a valid assumption

hansen-pansen12:06:48

Okay, then I should build a loop that take!s from the stream, create a manifold.deferred pool (which I tried a while ago, where you helped me, too.)

dm312:06:59

I’m not sure how you want to signal backpressure to the source

dm312:06:25

you can have a token system where you take! as many times as you have free workers

hansen-pansen12:06:03

Yes, this is why I initially went to manifold. I assumed the stream would block, when the thread-pool is full, and therefore couldn’t consume immediately.

hansen-pansen12:06:09

So I completely misunderstood the idea of the executor on the stream. I must admit, I don’t see a reason at all right now for it.

dm312:06:49

it’s there to control where the execution happens

hansen-pansen12:06:08

Should I put a deferred (like (future sleep)) onto the stream?

dm313:06:30

consume-parallel is trickier than I thought 🙂

hansen-pansen13:06:43

I think I will just move back to core.async/pipeline-blocking.

dm313:06:28

what’s your end goal? process items from the stream in parallel?

dm313:06:44

or partition the stream into N streams?

hansen-pansen13:06:10

Exactly. I have a function a that gets called via a web service irregularly, which returns a value. This value has to be processed with another function b that triggers an HTTP request and some computation. I want to decouple a from b. My idea is that a feeds that value into a stream, and at the sink a worker pool performs b in parallel, and putting the result onto another stream.

hansen-pansen13:06:30

So your first suggestion.

hansen-pansen14:06:00

Interesting. (s/consume) is definitely the “blocker” here. Even if I map a future on the stream, derefing the future blocks the rest. If I consume, but do not deref, everything runs in parallel.

dm314:06:56

I couldn’t figure out how to make the parallelization work on the onto thread pool

dm314:06:25

actually I have something 🙂

dm314:06:14

(require '[manifold.stream :as s])
(require '[manifold.deferred :as d])
(require '[manifold.executor :as ex])

(defn round-robin-fork
  [src n]
  (let [dsts (take n (repeatedly s/stream))
        ^java.util.concurrent.BlockingQueue ready
        (doto (java.util.concurrent.ArrayBlockingQueue. n)
          (.addAll dsts))

        free-up! #(.offer ready %)
        next! #(.take ready)
        send! #(-> (s/put! %1 %2)
                   (d/chain
                     (fn [result]
                       (if result
                         (free-up! %1)
                         (s/close! %1)))))]
    (d/loop [dst (.take ready)]
      (-> (s/take! src ::none)
          (d/chain
            (fn [result]
              (if (= result ::none)
                (doseq [d dsts]
                  (s/close! d))
                (do (future (send! dst result))
                    (d/chain (next!)
                      #(d/recur %))))))))
    dsts))

;; Test
(def s (s/stream))
(def ss (round-robin-fork s 10))

(doseq [[idx s] (map-indexed vector ss)]
  (s/consume
    #(let [sleep (rand-int 1000)]
       (Thread/sleep sleep)
       (println (Thread/currentThread) "DST [" idx "] slept for " sleep " ms, got: " %))
    s))

(dotimes [i 20]
  (println  "putting " i)
  @(s/put! s i))

dm314:06:24

However the work here is performed on the default future executor

dm314:06:52

however you could pass a pool as a third argument

dm314:06:58

and use it in future-with

dm314:06:59

the name is from the previous iteration, should probably be just fork

dm314:06:15

also if you do onto and use the same executor to run the tasks inside fork - the thing will deadlock somewhere - not sure where