Fork me on GitHub

Hey, I'm (still) trying to understand the pipeline-blocking and it's uses: I did a REPL experiment:

(let [stdin (async/chan)
      stdout (async/chan)
      prn2 #(locking prn (prn %))]
  (async/pipeline-blocking 2
                           (map #(doto % prn2))
  (async/go-loop []
    (when-let [x (async/<! stdout)]
      (prn2 [:stdout x]))
    (prn2 [:close]))
  (dotimes [i 10]
    (prn2 [:put i (async/put! stdin i)]))
  (async/close! stdin))
It behaves: - All 10 put! returns true - Only 5 (the first 5) elements get processed - The stdout only receive 1 processed result, then receive a close - On VisualVM, I see a lot of of "zombie threads" None of these behaviors make sense for me. Where I can read more about it? There is use examples?

Alex Miller (Clojure team)13:09:12

put! is async so will basically queue up 10 pending puts (returns true if the channel isn't closed and it's not)

Alex Miller (Clojure team)13:09:30

the "zombie" threads are probably not "zombies", they are mostly daemon threads in pools waiting for work and will go away if they don't get any. the go-loop is backed by a pool, and pipeline blocking is backed by a pool, so I'd expect threads from both of those

Alex Miller (Clojure team)13:09:16

you're using a transducer but unbuffered in/out channels, that may be an issue, could be an unhandled exception happening there

Alex Miller (Clojure team)13:09:54

if you change stdin / stdout to have a buffer (async/chan 10) does that change what you see?

👍 3

@alexmiller I'm trying to implement a "clojure idiomatic" http server, from java SocketServer (for learning/fun) ATM I'm using java.util.concurrent/newFixedThreadPool as thread pool. Does make sense use pipeline-blocking in this case?


pipeline-blocking and pipeline will act like their own additional threadpool, they won't use threads from your threadpool


you can use pipeline-async to run tasks on your own threadpool


Yeah, i'm saying, stop using threadpool and use pipeline


Or they are not "equivalent"? I'm still not understand well the use-case of pipeline-blocking


a pipeline takes inputs, does something to them, and then outputs them, in order


so you could use that to implement processing an http request, but there are a lot of decisions to be made on how to do that


if you are using SocketServer, the example servers for that kind of thing usually spin up a new thread (or put a task on a threadpool) for each request, the analogue of that would be running a go block, not using a pipeline


a pipeline is more like a combination of an executor and a ExecutorCompletionService, with some more bits (combining an executor and an exceutorcompletionservice doesn't get you the ordering you get from a pipeline)


which is to say, a pipeline is not equivalent to an executor


Tnks @U0NCTKEV8 I will keep at "regular" threadpools One signal that it's not a pipeline blocking problem is that i don't care about "output" I just need to "submit" and get the work done in another thread


A pipeline might work well if you implement http request pipelining or whatever it is called where you re-use the same tcp connection for multiple http requests


So you would process the requests in parallel and return results in order

👍 3

Once I get closer to HTTP/2 or websockets I can think about it 😅


well, http2 is a hole other thing, it does true multiplexing, not pipelining


After some interations with @U9ABG0ERZ I see that this use-case is more about agent and less about pipeline 😉

sheepy 3