Fork me on GitHub
#core-async
<
2020-02-17
>
souenzzo14:02:36

How do I know which JAVA methods are "blocking safe"? This takeFirst is non-blocking? https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/BlockingDeque.html#takeFirst()

mpenet14:02:58

"Retrieves and removes the first element of this deque, waiting if necessary until an element becomes available." guess not

souenzzo14:02:10

There is a ready impl of BlockingDeque piping into chan ?

markmarkmark14:02:40

in the case of BlockingDeque, there's actually a table that specifies which methods block at the top of the javadoc

markmarkmark14:02:40

if you wanted to bring a BlockingDeque and core.async together, I would expect lots of calls to async/thread

markmarkmark14:02:03

or at least one per Deque

souenzzo15:02:22

I end up with

(async/thread
  (loop []
    (let [tx (.take tx-queue)]
      (async/put! conn-chan tx))
    (recur)))

hiredman18:02:55

Use >!! Otherwise you are missing back pressure from the consumer of conn-chan

hiredman18:02:20

Using put! naively is very bad, using >!! often just does the right thing

souenzzo19:02:35

conn-chan is a sliding buffer. It will just pub/sub it's value

conn-chan (async/chan (async/sliding-buffer 1))

mpenet15:02:39

you can also just use future instead of async/thread if you don't take from the returned chan

Ben Sless19:02:30

Hi all, I'm trying to create an async process which takes messages from an input channel, times them out, then passes them to output channel. Order is not important. As you can see, there's a bug in my implementation, both in that it doesn't create backpressure, and that it'll throw exceptions if n+1024 messages are consumed before being retired. Any recommendations on how to fix this, and without hitting the maximum put limit, such as with pipeline-async w/ n>1024?