This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2022-02-21
Channels
- # announcements (4)
- # architecture (161)
- # autochrome-github (7)
- # babashka (61)
- # beginners (42)
- # calva (24)
- # cider (22)
- # clj-kondo (28)
- # cljs-dev (8)
- # clojure (88)
- # clojure-art (2)
- # clojure-dev (7)
- # clojure-europe (43)
- # clojure-germany (2)
- # clojure-nl (2)
- # clojure-uk (4)
- # clojurescript (32)
- # core-async (41)
- # cursive (32)
- # datahike (6)
- # datomic (9)
- # emacs (22)
- # events (2)
- # fulcro (10)
- # graphql (1)
- # nextjournal (16)
- # off-topic (9)
- # overtone (1)
- # pathom (16)
- # polylith (5)
- # quil (7)
- # rdf (1)
- # re-frame (7)
- # reagent (22)
- # releases (2)
- # remote-jobs (1)
- # reveal (12)
- # sci (1)
- # shadow-cljs (12)
- # specter (20)
- # sql (6)
- # tools-deps (21)
- # vim (26)
- # xtdb (10)
Core.async’s thread pools use daemon threads. That means that when I hand tasks off to (go …)
blocks or (thread …)
, and the JVM is shut down, these tasks will just be cancelled.
Is there a way to somehow ensure pending work is done?
Besides not being supposed to spin off unbounded uncontrolled tasks (I.e. your code's design should block on something waiting Dir them to finish), you can add a JVM shutdown hook. Maybe
Yep.
(defonce ^:private ^Executor thread-macro-executor
(Executors/newCachedThreadPool (conc/counted-thread-factory "async-thread-macro-%d" true)))
The true
is the daemon
argument.Nice. Anyway, this can be solved by design - never spin up a thread or go block that no one waits for. In the end, you'll see your main thread waits for something to finish
Hm. So basically I would need to manage my own thread pool for workers to block until their read channel closes?
Can you describe your use case? The good old "what are you trying to do" Code samples help, too
Sure. I want to have an in-process event broker so that I can publish things happening and not caring what will need to happen as a reaction. Could be as simple as:
(def publish-chan (chan))
(def the-pub (pub chan first))
Now I can publish events like that:
(put! publish-chan [:something-happened {}])
Or use >!
/ >!!
to respect back-pressure.
In different parts of the code, I want to subscribe to events and do something interesting, e.g. relaying to external systems. Think of it as “whenever a user logs in, send a message to a slack channel”.
Naive would be to just read and forward I/O using thread
:
(let [ch (chan)]
(sub the-pub :user-logged-in ch)
(go-loop [] (when [msg (<! ch)] (thread (send-to-slack msg)) (recur))))
Instead I would need to somehow have one ore more “slack worker threads” running and blocking on a channel.
one is putting on the channel. Always respect back pressure two is you should wait on the go-loop to terminate three you should wait for the thread to finish
You should use pipeline-blocking
, which will manage it for you, and connect some sort of "drain" to the results channel
One - well I literally wrote about that, this was just an example. I could also decide to use a windowed buffer and fire-and-forget — this is about decoupling after all
Not precisely, fire and forget is like goto
, it has unfortunate results, just like you found out
So you should always have a way of finding out a go block or thread finished, never just spin them up
And if you don't make sure the items are dropped you'll get an exception if you put items too fast on the channel
Sure, but this would not happen with >!!
E.g. that’s not the point. But you made good points about waiting on the subscriber side
Maybe I could just mix all the subscribers into one channel, and wait for this one to close in the main thread
Remember that putting and taking are synchronization points. You block your execution flow if you >!!
, which you might not want.
Sure, but this is about back-pressure, right?
I mean I want to publish and then go on. With a large enough buffer, should be OK. But when the buffer is full, I think it would be better to block
Yes, and/but - your way to control this unbounded concurrency problem is by turning it into back-pressure
To allow for multiple subscribers
in that case, I'd create the subs, pipeline them, then mix them all back and wait for that to finish
I actually want to bundle all that async stuff in one place, So that my “main logic” can stay simple
Yep, that makes sense.