This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2017-06-08
Channels
- # aleph (52)
- # beginners (74)
- # boot (8)
- # cider (4)
- # clara (3)
- # cljs-dev (1)
- # cljsjs (2)
- # cljsrn (1)
- # clojars (2)
- # clojure (300)
- # clojure-argentina (1)
- # clojure-dev (9)
- # clojure-italy (10)
- # clojure-nl (1)
- # clojure-russia (77)
- # clojure-sg (9)
- # clojure-spec (38)
- # clojure-uk (70)
- # clojurescript (108)
- # core-async (12)
- # cursive (9)
- # data-science (4)
- # datascript (7)
- # datomic (37)
- # defnpodcast (4)
- # emacs (11)
- # graphql (6)
- # jobs (3)
- # jobs-discuss (1)
- # juxt (3)
- # keechma (1)
- # klipse (4)
- # lein-figwheel (1)
- # lumo (1)
- # off-topic (3)
- # om (5)
- # onyx (10)
- # parinfer (3)
- # pedestal (1)
- # perun (1)
- # protorepl (3)
- # re-frame (35)
- # reagent (19)
- # spacemacs (4)
- # specter (2)
- # uncomplicate (279)
- # unrepl (32)
@dm3 @ehashman Thank you very much for the hints and your continuous help!
I again read the mentioned link (which, together with the codox documentation and the repository doc folder), is basically the only documentation I know of), and adapted my example with the (s/stream 10)
. But it doesn’t change the result, and I believe I do not understand the concept, when and where callbacks are performed. I suspect that my sleep
function does not get assigned to the pool
thread pool via stream/map
, or stream/consume
plays a role.
Sure, thank you.
1. Sending … and … finished. appear immediately (i.e., a batch of put!
s do not block)
2. After one second, I see 10 Slept 1000 ms
I suspect the stream/consume
blocks the pool.
Well, expectation 1 was met 🙂
I’ll try with the alpha right now…
Nope, not for me. Clojure 1.8 or 1.9 alpha?
boot.user=> (def a (s/stream 10))
#'boot.user/a
boot.user=> (def pool (manifold.executor/fixed-thread-executor 10 {:initial-thread-count 10}))
#'boot.user/pool
boot.user=> (s/consume #(println "Got" %) (s/map sleep (s/onto pool a)))
<< … >>
boot.user=> (do (println "Started" (java.util.Date.)) (dotimes [_ 10] @(s/put! a 1000)) (println "Done" (java.util.Date.)))
I know. I tried manifold 0.1.7 alpha, but see “my” behaviour. That’s why I also asked for the Clojure version. I will try with your version. Right now, I am a leiningen, no boot, user. Just a sec.
I know / hope. But I want to try your exact snippet.
I try it out, I need to head out for lunch. Sorry, this is not very respectful from me.
😂 Yes, that’s truly asynchronous.
I think I should re-phrase expectation 2: I expect that after 1 second I see all 10 Got or Slept … at once, as those 10 sleeps get executed in parallel.
Instead, I see every second a Got and Slept …, which in total takes 10 seconds, as they run sequentially.
My assumption was that when the stream is consumed, the sleep get’s pushed to the worker pool, immediately fetching the next, putting in the pool etc.
Okay, then I should build a loop that take!
s from the stream, create a manifold.deferred
pool (which I tried a while ago, where you helped me, too.)
Yes, this is why I initially went to manifold
. I assumed the stream would block, when the thread-pool is full, and therefore couldn’t consume
immediately.
So I completely misunderstood the idea of the executor on the stream. I must admit, I don’t see a reason at all right now for it.
Should I put a deferred (like (future sleep)
) onto the stream?
I think I will just move back to core.async/pipeline-blocking
.
Exactly. I have a function a that gets called via a web service irregularly, which returns a value. This value has to be processed with another function b that triggers an HTTP request and some computation. I want to decouple a from b. My idea is that a feeds that value into a stream, and at the sink a worker pool performs b in parallel, and putting the result onto another stream.
So your first suggestion.
Interesting. (s/consume)
is definitely the “blocker” here. Even if I map a future on the stream, deref
ing the future blocks the rest. If I consume, but do not deref
, everything runs in parallel.
(require '[manifold.stream :as s])
(require '[manifold.deferred :as d])
(require '[manifold.executor :as ex])
(defn round-robin-fork
[src n]
(let [dsts (take n (repeatedly s/stream))
^java.util.concurrent.BlockingQueue ready
(doto (java.util.concurrent.ArrayBlockingQueue. n)
(.addAll dsts))
free-up! #(.offer ready %)
next! #(.take ready)
send! #(-> (s/put! %1 %2)
(d/chain
(fn [result]
(if result
(free-up! %1)
(s/close! %1)))))]
(d/loop [dst (.take ready)]
(-> (s/take! src ::none)
(d/chain
(fn [result]
(if (= result ::none)
(doseq [d dsts]
(s/close! d))
(do (future (send! dst result))
(d/chain (next!)
#(d/recur %))))))))
dsts))
;; Test
(def s (s/stream))
(def ss (round-robin-fork s 10))
(doseq [[idx s] (map-indexed vector ss)]
(s/consume
#(let [sleep (rand-int 1000)]
(Thread/sleep sleep)
(println (Thread/currentThread) "DST [" idx "] slept for " sleep " ms, got: " %))
s))
(dotimes [i 20]
(println "putting " i)
@(s/put! s i))
You rock!