Fork me on GitHub
#core-async
<
2022-02-21
>
Ferdinand Beyer16:02:12

Core.async’s thread pools use daemon threads. That means that when I hand tasks off to (go …) blocks or (thread …), and the JVM is shut down, these tasks will just be cancelled. Is there a way to somehow ensure pending work is done?

Ben Sless16:02:44

Are thread blocks daemonized?

Ben Sless16:02:50

Besides not being supposed to spin off unbounded uncontrolled tasks (I.e. your code's design should block on something waiting Dir them to finish), you can add a JVM shutdown hook. Maybe

Ferdinand Beyer16:02:54

Yep.

(defonce ^:private ^Executor thread-macro-executor
  (Executors/newCachedThreadPool (conc/counted-thread-factory "async-thread-macro-%d" true)))
The true is the daemon argument.

Ben Sless16:02:12

Nice. Anyway, this can be solved by design - never spin up a thread or go block that no one waits for. In the end, you'll see your main thread waits for something to finish

Ferdinand Beyer16:02:15

Hm. So basically I would need to manage my own thread pool for workers to block until their read channel closes?

Ben Sless17:02:10

Can you describe your use case? The good old "what are you trying to do" Code samples help, too

Ferdinand Beyer17:02:40

Sure. I want to have an in-process event broker so that I can publish things happening and not caring what will need to happen as a reaction. Could be as simple as:

(def publish-chan (chan))
(def the-pub (pub chan first))
Now I can publish events like that:
(put! publish-chan [:something-happened {}])
Or use >! / >!! to respect back-pressure. In different parts of the code, I want to subscribe to events and do something interesting, e.g. relaying to external systems. Think of it as “whenever a user logs in, send a message to a slack channel”. Naive would be to just read and forward I/O using thread:
(let [ch (chan)]
  (sub the-pub :user-logged-in ch)
  (go-loop [] (when [msg (<! ch)] (thread (send-to-slack msg)) (recur))))

Ferdinand Beyer17:02:34

Instead I would need to somehow have one ore more “slack worker threads” running and blocking on a channel.

Ben Sless17:02:32

lots of unbounded resources here

Ben Sless17:02:25

The mistake here is you have two fire-and-forget-s

Ben Sless17:02:06

one is putting on the channel. Always respect back pressure two is you should wait on the go-loop to terminate three you should wait for the thread to finish

Ben Sless17:02:02

You should use pipeline-blocking , which will manage it for you, and connect some sort of "drain" to the results channel

Ferdinand Beyer17:02:12

One - well I literally wrote about that, this was just an example. I could also decide to use a windowed buffer and fire-and-forget — this is about decoupling after all

Ben Sless17:02:04

Not precisely, fire and forget is like goto, it has unfortunate results, just like you found out

Ben Sless17:02:28

So you should always have a way of finding out a go block or thread finished, never just spin them up

Ben Sless17:02:41

pipelines give you an abstraction which is similar to a worker pool

Ben Sless17:02:33

And if you don't make sure the items are dropped you'll get an exception if you put items too fast on the channel

Ben Sless17:02:39

(1024 pending puts)

Ferdinand Beyer17:02:56

Sure, but this would not happen with >!!

Ferdinand Beyer17:02:18

E.g. that’s not the point. But you made good points about waiting on the subscriber side

Ferdinand Beyer17:02:09

Maybe I could just mix all the subscribers into one channel, and wait for this one to close in the main thread

Ben Sless17:02:20

Remember that putting and taking are synchronization points. You block your execution flow if you >!!, which you might not want.

Ben Sless17:02:22

That's an option

Ferdinand Beyer17:02:41

Sure, but this is about back-pressure, right?

Ferdinand Beyer17:02:27

I mean I want to publish and then go on. With a large enough buffer, should be OK. But when the buffer is full, I think it would be better to block

Ben Sless17:02:28

Yes, and/but - your way to control this unbounded concurrency problem is by turning it into back-pressure

Ben Sless17:02:59

Btw, why pubsub here and not a multimethod?

Ferdinand Beyer17:02:25

To allow for multiple subscribers

Ben Sless17:02:26

then (pipeline-blocking n to (map your-mfn) from)?

Ben Sless17:02:35

Ah, multiple handlers for same message

Ben Sless17:02:38

in that case, I'd create the subs, pipeline them, then mix them all back and wait for that to finish

Ferdinand Beyer17:02:40

I actually want to bundle all that async stuff in one place, So that my “main logic” can stay simple

Ben Sless17:02:47

then you have clean graceful shutdown

Ferdinand Beyer17:02:56

Yep, that makes sense.

Ferdinand Beyer17:02:03

Thanks a lot!

👍 1
Ben Sless17:02:07

Nice thing about this pattern is you know for certain everything finished if you wait on the end channel

Ben Sless17:02:14

I really like this pattern