Fork me on GitHub
#core-async
<
2020-09-03
>
souenzzo12:09:59

Hey, I'm (still) trying to understand the pipeline-blocking and it's uses: I did a REPL experiment:

(let [stdin (async/chan)
      stdout (async/chan)
      prn2 #(locking prn (prn %))]
  (async/pipeline-blocking 2
                           stdout
                           (map #(doto % prn2))
                           stdin)
  (async/go-loop []
    (when-let [x (async/<! stdout)]
      (prn2 [:stdout x]))
    (prn2 [:close]))
  (dotimes [i 10]
    (prn2 [:put i (async/put! stdin i)]))
  (async/close! stdin))
It behaves: - All 10 put! returns true - Only 5 (the first 5) elements get processed - The stdout only receive 1 processed result, then receive a close - On VisualVM, I see a lot of of "zombie threads" None of these behaviors make sense for me. Where I can read more about it? There is use examples?

Alex Miller (Clojure team)13:09:12

put! is async so will basically queue up 10 pending puts (returns true if the channel isn't closed and it's not)

Alex Miller (Clojure team)13:09:30

the "zombie" threads are probably not "zombies", they are mostly daemon threads in pools waiting for work and will go away if they don't get any. the go-loop is backed by a pool, and pipeline blocking is backed by a pool, so I'd expect threads from both of those

Alex Miller (Clojure team)13:09:16

you're using a transducer but unbuffered in/out channels, that may be an issue, could be an unhandled exception happening there

Alex Miller (Clojure team)13:09:54

if you change stdin / stdout to have a buffer (async/chan 10) does that change what you see?

👍 3
souenzzo18:09:18

@alexmiller I'm trying to implement a "clojure idiomatic" http server, from java SocketServer (for learning/fun) ATM I'm using java.util.concurrent/newFixedThreadPool as thread pool. Does make sense use pipeline-blocking in this case?

hiredman18:09:37

pipeline-blocking and pipeline will act like their own additional threadpool, they won't use threads from your threadpool

hiredman18:09:32

you can use pipeline-async to run tasks on your own threadpool

souenzzo18:09:43

Yeah, i'm saying, stop using threadpool and use pipeline

souenzzo18:09:00

Or they are not "equivalent"? I'm still not understand well the use-case of pipeline-blocking

hiredman18:09:58

a pipeline takes inputs, does something to them, and then outputs them, in order

hiredman18:09:47

so you could use that to implement processing an http request, but there are a lot of decisions to be made on how to do that

hiredman18:09:51

if you are using SocketServer, the example servers for that kind of thing usually spin up a new thread (or put a task on a threadpool) for each request, the analogue of that would be running a go block, not using a pipeline

hiredman18:09:35

a pipeline is more like a combination of an executor and a ExecutorCompletionService, with some more bits (combining an executor and an exceutorcompletionservice doesn't get you the ordering you get from a pipeline)

hiredman18:09:24

which is to say, a pipeline is not equivalent to an executor

souenzzo19:09:56

Tnks @U0NCTKEV8 I will keep at "regular" threadpools One signal that it's not a pipeline blocking problem is that i don't care about "output" I just need to "submit" and get the work done in another thread

hiredman19:09:13

A pipeline might work well if you implement http request pipelining or whatever it is called where you re-use the same tcp connection for multiple http requests

hiredman19:09:01

So you would process the requests in parallel and return results in order

👍 3
souenzzo19:09:59

Once I get closer to HTTP/2 or websockets I can think about it 😅

hiredman20:09:04

well, http2 is a hole other thing, it does true multiplexing, not pipelining

souenzzo18:09:06

After some interations with @U9ABG0ERZ I see that this use-case is more about agent and less about pipeline 😉

sheepy 3