This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2022-05-25
Channels
- # announcements (9)
- # asami (69)
- # babashka (151)
- # babashka-sci-dev (34)
- # beginners (90)
- # cider (21)
- # clj-on-windows (17)
- # clj-otel (4)
- # cljsrn (5)
- # clojure (27)
- # clojure-austin (3)
- # clojure-europe (87)
- # clojure-gamedev (1)
- # clojure-nl (3)
- # clojure-norway (8)
- # clojure-poland (2)
- # clojure-uk (3)
- # clojured (10)
- # clojurescript (50)
- # core-async (73)
- # cursive (28)
- # data-science (2)
- # datomic (17)
- # etaoin (1)
- # honeysql (6)
- # introduce-yourself (3)
- # jobs (1)
- # joyride (12)
- # malli (5)
- # nbb (14)
- # off-topic (18)
- # pathom (4)
- # podcasts-discuss (2)
- # polylith (30)
- # project-updates (3)
- # re-frame (33)
- # reitit (1)
- # remote-jobs (13)
- # shadow-cljs (59)
- # sql (12)
- # tools-build (7)
- # xtdb (36)
Every now and then I find myself in a situation where I want to check if a channel is closed. For example, when we start a background thread
and want to check if the returned channel is not closed (= thread is still running).
Unfortunately, closed?
is not part of the “public” API. Is it considered OK to use core.async.impl.protocols/closed?
for this?
This would only work if the thread
returned non-`nil`, and only once, right?
E.g. running -> poll!
returns nil
Finished with non-nil value: poll!
returns non-`nil`, next poll!
returns nil
again…
Instead of poll!
you can do:
(alt! <thread-channel> :finished :default :not-finished)
This should work even if the thread returns nil and you can check multiple times.Thanks, this should work indeed.
However, this seems like a terribly complicated way of checking if a channel is closed.
Let me rephrase my question: Do we consider core.async.impl
private/internal? And if yes: Can we provide a public closed?
?
core.async.impl
is definitely not part of the public interface. Having public closed?
has been proposed ages ago, but it's considered a potential footgun, I wouldn't hold my breath
https://clojure.atlassian.net/browse/ASYNC-126
Thanks for the link. Meh. Not sure how I feel about it
I think this is a bit of a simplistic claim though isn’t it. Seems like we want to prevent people from doing (if-not (closed? ch) (put! ch val))
, but my use case is entirely different.
> However, this seems like a terribly complicated way of checking if a channel is closed. IIUC, your semantic goal is to check if some spawned thread is still processing work, not whether the chan is closed
Yes to both. There are of course other ways of doing that but I am not convinced they are “better”. In my case we already have a thread polling for events, and I wanted to periodically check if this is still running. The channel will close when the work in the thread is done executing, so this seems like a pretty legitimate signal to check to me.
Nothing. I want to check whether it is running
I have a thread that might end when an exception occurs and I want to check whether this is running in a kubernetes pod liveliness probe
That’s what I said, yes 🙂
Sorry, I don’t want to be rude and I appreciate you wanting to help. But this is not helping. I know there are other ways of doing that. I can also just initialize an atom and set it when done.
That’s not the point though
A channel has a perfectly defined notion of being closed or not. This can be used in multiple ways, e.g. with alt!!
, or with impl/closed?
. I have a channel that will close when the process finishes. And I want to check if that happened. So why shouldn’t I be able to do just that?
(Will be away for a while)
This is veering into square pegs in round holes territory, the way to do something when a channel closes is to take from it, that's CSP, anyway else you're fighting the semantics
It's clear from your problem this is a when
question, not an if
question. The way to make things happen in sequence in CSP is clear
To me it seems like a strange limitation. People are able to handle this power elsewhere responsibly, like with clojure.core/realized?
If he wrapped his channel operations, and maintained an object with an open flag himself, your objections would go away. But why would that actually be better?
the problem is buried somewhere in the OP > every now and then I find myself in a situation where I want to check if a channel is closed > [is the thread still running?]
I don't think so - he already said that the process is completed when the channel is closed. For all we know this is just affecting the color of a little dot on a dashboard. Why is necessarily a problem, especially for this case?
i'm going to stay thinking about the application task and not the impl. Is it a fan out situation? Is it something where there is a director doing pipelining? are there competing SLA for different tasks?
there is a nice comment on the "please add closed?" ticket from @U053S2W0V > Please do not add a closed? predicate to the public API. It's a race condition in 99.9% of cases. Go does not provide such a predicate and after growing accustom to CSP via both core.async and Go, I have never felt the need for it. Having the predicate would invite its use. I enjoy clojure.core's preference for omission of questionable functions and expect the same from core.async. > If somebody desperately feels the need for this predicate, ask in IRC or Slack, I'm sure somebody can help you fix the structuring of your channels/processes/etc.
Do you agree with the suggestion above to reset!
an atom at the end of the channel? If so, why would checking that atom lead to a more correct program than checking the internally maintained closed value here? Seems like it would just be the same thing with more code.
you can swap an atom, or return info on a channel, or print to stdout, or push to SQS to publish information
Right. I think it is publishing information, because he said "nothing" needs to happen when it is completed. I think he understands that you generally don't compose things that way.
Well I think it isn't literally nothing, so I'm guessing it is publishing progress/status information. But still, doing it the atom way would accomplish the same thing, yet wouldn't be 'wrong'. Yet it would be functionally the same...
all I'm trying to say is I don't know, but before knowing I have to set aside a closed channel predicate because it's an impl to some higher level need
Yea I understand. I'm interpreting Ferdinand as questioning whether that was a good decision, and I see his point for cases like this. Whether in general it still makes sense to not allow it because of all the other cases where people would tend to misuse it, I don't know, but I have some doubts. But probably nothing will happen, so I'll leave it there.
The interesting thing is that when you enqueue a read on a channel with <! you already get periodic polling via the runtime. Why not use that?
Thanks for all your replies!
> It’s clear from your problem this is a when
question, not an if
question. The way to make things happen in sequence in CSP is clear
I don’t agree @UK0810AQ2. Maybe it helps when I explain in more detail what situation I found myself in.
We have a service running in Kubernetes. It starts a Kafka consumer in an async thread. There is some existing graceful shutdown logic to signal this consumer to stop, and wait for the thread to close, using alt!!
and a timeout.
The service also has a liveliness probe, e.g. Kubernetes will periodically call a web endpoint to check if everything is fine.
Now we had an issue that this consumer “crashed” when an event handler threw an exception. The existing code would exit the async thread in this case.
Now this puts our service in a defunct state, as it keeps running but stops consuming. A pragmatic improvement would be to check if the consumer is up periodically in the health check. When it stopped, just report the service unhealthy and restart it.
So in the scope of the health check, this is an “if the thread is running”, not a “when it finishes”.
Of course, the problem can be rephrased. And there are multiple other ways of dealing with this situation.
HOWEVER, a super simple solution would be to just say: Healthy if all consumers are running. This can be done with impl.protocols/closed?
. This can also be done by writing your own closed?
using the public API (as suggested by @UCPGSBNQ4 here):
(defn closed? [ch]
(alt!! ch true :default false))
Do you really think checking for the channel to be closed this way is bad, or achieving the exact same answer with different, more complex methods is inherently better?
My point is: Channels being closed is not an implementation detail, it is a well-defined and documented property of a channel. So why hide it, or asking people to test indirectly in a more complicated way?I think we're going to have to agree to disagree here,
I think essentially, the there are several conceptual clashes -
One is a mixture of synchronous behavior (channels, whens) with asynchronous behavior (polling)
The other is a mixture of implementation details of several levels of abstraction;
One is a channel being closed or open vs. a process running or not and its status
The other is of components, their status and its monitoring, i.e. you need some internal monitor, or supervisor, and each process/component needs to have a way to send signals to that monitor, where you can do something like (go (if (bad? (<! thread)) (signal monitor)))
and the monitor will manage and keep the health status of all components;
Then when you get a health check, you just ask the monitor
OK to disagree 🙂
There are multiple ways to achieve this. Health checks usually use a “pull” approach, while you are suggesting a “push” approach. Both are valid, but I don’t see why we should patronize people.
Abstraction: I think it is perfectly valid to say: A health check asks if consumers are healthy. This can be implemented by asking if a process is running. This in turn can be implemented by asking if a channel is closed, because this is what async/thread
and async/go
provide to you: A channel that will close when the async process exits.
I just want to point out that (alt!! ch true :default false)
is NOT the same thing as checking if the channel is closed. Unlike the real closed?
it will take a value if available and return true
in that case even when the channel isn't actually closed.
But for this context I think it's perfect, since it does what you need using basic CSP ops and avoids the need for the actual closed?
function which doesn't fit CSP.
Thanks for the context. At nubank we have Kafka consumers running in async/threads (with an internal retry loop), and our health check uses a 'pull' model from the outside, checking an atom
we have a healthchecking protocol, and the healthchecker doesn't know that the info is coming from an atom.
I would suggest pretending all the core async impl stuff doesn't exist, and try to model everything with plain, public Clojure.
@UCPGSBNQ4 you are right of course. What I dislike about that workaround is that eventually it will resort to impl/closed?
internally, adding a lot of bloat that could be avoided by calling closed?
directly. Also, it is not trivial to understand, and I also had to run it to convince myself it works, as the docs did not seem clear to me if a closed channel or :default
would take precedence. I.e. if a take on a closed channel is considered “immediately ready”. So it adds a lot of complexity.
@U050ECB92 — Sure, and of course we are doing something similar. We don’t have the need for a protocol yet, but the health check will of course not check channels directly. It will delegate to something like kafka-consumers-running?
.
At some point though you need to map it to the actual implementation of the consumer loop. And yes, sure, one could use atoms. I am repeating myself here 😉
There’s more to it. In our case we had OutOfMemoryError
s that “killed” the consumer thread.
So a solution that sets an atom would need to:
• Create and manage an atom somewhere
• Wrap code inside the thread in try-catch
• finally
reset/swap the atom
That’s not terribly difficult, but there is some room for errors (e.g. catching Exception
instead of everything / using finally
) and what you end up with is just the same as checking the channel, with more code.
All correct and good and right. But. You don’t always need the “perfect” solution.
In my case we had a defunct service and I wanted to add a pragmatic simple solution without having to refactor a lot of code
Killing pods with Kubernetes healthchecks is certainly less elegant than circuit breakers / retries in code, but they do work
And I got this bandaid on within minutes 🤷
Simple needs -> simple solutions More elaborate needs -> more elaborate solutions?