Fork me on GitHub
#aleph
<
2020-01-23
>
igrishaev10:01:02

A small question: what is the best way to implement a “worker” entity? By this I mean a separate loop/recur in the background which consumes a stream? Say I have a stream of events and would like to process them by four workers.

flowthing10:01:03

Well, I guess you could use manifold.stream/consume or manifold.stream/consume-async, at least. Or, depending on what you're trying to do, you could also look at manifold.bus.

mccraigmccraig12:01:27

@U1WAUKQ3E we have map-concurrently which uses stream-buffers to control concurrency of non-blocking ops (or Futures wrapping blocking ops) - https://gist.github.com/mccraigmccraig/b10156ed0b59de6ccc93dbb1115df7c9

igrishaev12:01:34

Thanks, looks interesting, let me read.

mccraigmccraig12:01:13

our general pattern is to map-concurrently then reduce to consume the stream

mccraigmccraig12:01:18

we've also wrapped all the core manifold.stream functions to propagate errors sensibly - errors during map get captured in a marker record on the output stream, and returned (as an errored deferred) by any reduce

igrishaev12:01:38

Still, I’m looking for something like that:

(let [stream (...)
      limit 4]
  (doseq [n limit]
    (spawn-worker stream msg-function)))

igrishaev12:01:59

what should be in spawn-worker ?

mccraigmccraig13:01:37

we would do (->> stream (map-concurrently 4 #(run-task whatever %)) (reduce conj [])) to run 4 concurrent tasks over a stream - we don't really use "workers" as such, which would probably need a queue to feed them etc.

mccraigmccraig13:01:08

most of our tasks are non-blocking, but for the (rare) cases in our platform where run-task is a blocking thing we wrap it in a deferred/future

igrishaev10:01:40

(let [stream (get-stream)
      number 4]
  (doseq [n number]
    (d/loop []
      (let [message (get-from-stream stream)]
        (process-message message)
        (d/recur)))))

cddr11:01:56

The devil is in the detail. Do you care about order? What if item-a got popped off the stream by the first loop and for some reason takes a long time to process, meanwhile item-b got popped off by the second loop and gets processed immediately.

igrishaev12:01:39

No the order is not important.