This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2020-01-23
Channels
- # aleph (14)
- # announcements (2)
- # babashka (8)
- # bangalore-clj (2)
- # beginners (66)
- # calva (8)
- # cider (1)
- # clj-kondo (24)
- # cljdoc (3)
- # cljs-dev (3)
- # cljsrn (2)
- # clojure (197)
- # clojure-europe (1)
- # clojure-india (5)
- # clojure-italy (4)
- # clojure-nl (27)
- # clojure-uk (18)
- # clojurescript (56)
- # code-reviews (19)
- # core-async (86)
- # cursive (16)
- # data-science (1)
- # datomic (16)
- # docker (3)
- # events (1)
- # fulcro (101)
- # graalvm (7)
- # graphql (16)
- # jobs (1)
- # jobs-discuss (6)
- # kaocha (4)
- # luminus (1)
- # off-topic (93)
- # onyx (3)
- # pathom (9)
- # planck (2)
- # re-frame (8)
- # reagent (3)
- # reitit (3)
- # remote-jobs (3)
- # shadow-cljs (21)
- # test-check (3)
- # tools-deps (21)
- # vim (16)
A small question: what is the best way to implement a “worker” entity? By this I mean a separate loop/recur in the background which consumes a stream? Say I have a stream of events and would like to process them by four workers.
Well, I guess you could use manifold.stream/consume
or manifold.stream/consume-async
, at least. Or, depending on what you're trying to do, you could also look at manifold.bus
.
@U1WAUKQ3E we have map-concurrently
which uses stream-buffers to control concurrency of non-blocking ops (or Futures
wrapping blocking ops) - https://gist.github.com/mccraigmccraig/b10156ed0b59de6ccc93dbb1115df7c9
our general pattern is to map-concurrently
then reduce
to consume the stream
we've also wrapped all the core manifold.stream
functions to propagate errors sensibly - errors during map
get captured in a marker record on the output stream, and returned (as an errored deferred) by any reduce
Still, I’m looking for something like that:
(let [stream (...)
limit 4]
(doseq [n limit]
(spawn-worker stream msg-function)))
we would do (->> stream (map-concurrently 4 #(run-task whatever %)) (reduce conj []))
to run 4 concurrent tasks over a stream - we don't really use "workers" as such, which would probably need a queue to feed them etc.
most of our tasks are non-blocking, but for the (rare) cases in our platform where run-task
is a blocking thing we wrap it in a deferred/future
(let [stream (get-stream)
number 4]
(doseq [n number]
(d/loop []
(let [message (get-from-stream stream)]
(process-message message)
(d/recur)))))