Fork me on GitHub
#core-async
<
2022-05-25
>
Ferdinand Beyer07:05:41

Every now and then I find myself in a situation where I want to check if a channel is closed. For example, when we start a background thread and want to check if the returned channel is not closed (= thread is still running). Unfortunately, closed? is not part of the “public” API. Is it considered OK to use core.async.impl.protocols/closed? for this?

Ben Sless08:05:51

Why not poll it?

Ferdinand Beyer08:05:23

This would only work if the thread returned non-`nil`, and only once, right? E.g. running -> poll! returns nil Finished with non-nil value: poll! returns non-`nil`, next poll! returns nil again…

Jan K11:05:14

Instead of poll! you can do:

(alt! <thread-channel> :finished :default :not-finished)
This should work even if the thread returns nil and you can check multiple times.

Ferdinand Beyer11:05:22

Thanks, this should work indeed. However, this seems like a terribly complicated way of checking if a channel is closed. Let me rephrase my question: Do we consider core.async.impl private/internal? And if yes: Can we provide a public closed??

Jan K11:05:50

core.async.impl is definitely not part of the public interface. Having public closed? has been proposed ages ago, but it's considered a potential footgun, I wouldn't hold my breath https://clojure.atlassian.net/browse/ASYNC-126

Ferdinand Beyer11:05:11

Thanks for the link. Meh. Not sure how I feel about it

ghadi12:05:53

closed? predicate is just a recipe for race conditions

ghadi12:05:22

it was rejected pretty early in core.async's design

Ferdinand Beyer12:05:50

I think this is a bit of a simplistic claim though isn’t it. Seems like we want to prevent people from doing (if-not (closed? ch) (put! ch val)), but my use case is entirely different.

ghadi13:05:17

have you considered other solutions besides checking if the chan is closed?

ghadi13:05:36

> However, this seems like a terribly complicated way of checking if a channel is closed. IIUC, your semantic goal is to check if some spawned thread is still processing work, not whether the chan is closed

Ferdinand Beyer13:05:04

Yes to both. There are of course other ways of doing that but I am not convinced they are “better”. In my case we already have a thread polling for events, and I wanted to periodically check if this is still running. The channel will close when the work in the thread is done executing, so this seems like a pretty legitimate signal to check to me.

Ben Sless16:05:30

What exactly do you want to happen when this thread finishes?

Ferdinand Beyer16:05:24

Nothing. I want to check whether it is running

Ferdinand Beyer16:05:23

I have a thread that might end when an exception occurs and I want to check whether this is running in a kubernetes pod liveliness probe

Ben Sless16:05:52

So this is a liveliness check?

Ferdinand Beyer16:05:08

That’s what I said, yes 🙂

Ben Sless16:05:02

just making sure;

Ben Sless16:05:10

so what you need is a notion of when ... then

Ben Sless16:05:37

with channels:

(go (<! (your thread)) (do stuff))

Ben Sless16:05:34

(go
  (<! (thread
        (reset! running? true)
        ,,,))
  (reset! running? false))

Ferdinand Beyer16:05:45

Sorry, I don’t want to be rude and I appreciate you wanting to help. But this is not helping. I know there are other ways of doing that. I can also just initialize an atom and set it when done.

Ferdinand Beyer16:05:52

That’s not the point though

Ben Sless16:05:27

Isn't it? why does it have to be polling?

Ferdinand Beyer16:05:11

A channel has a perfectly defined notion of being closed or not. This can be used in multiple ways, e.g. with alt!!, or with impl/closed?. I have a channel that will close when the process finishes. And I want to check if that happened. So why shouldn’t I be able to do just that?

1
Ferdinand Beyer16:05:56

(Will be away for a while)

Ben Sless17:05:18

This is veering into square pegs in round holes territory, the way to do something when a channel closes is to take from it, that's CSP, anyway else you're fighting the semantics

Ben Sless17:05:49

It's clear from your problem this is a when question, not an if question. The way to make things happen in sequence in CSP is clear

isak19:05:41

To me it seems like a strange limitation. People are able to handle this power elsewhere responsibly, like with clojure.core/realized?

isak19:05:56

If he wrapped his channel operations, and maintained an object with an open flag himself, your objections would go away. But why would that actually be better?

👍 1
ghadi19:05:14

this is getting off track IMO

ghadi19:05:54

the problem is buried somewhere in the OP > every now and then I find myself in a situation where I want to check if a channel is closed > [is the thread still running?]

ghadi19:05:10

within that, there is a problem

ghadi19:05:34

"I want to check if a channel is closed" is not a problem

isak19:05:36

I don't think so - he already said that the process is completed when the channel is closed. For all we know this is just affecting the color of a little dot on a dashboard. Why is necessarily a problem, especially for this case?

ghadi19:05:30

i'm going to stay thinking about the application task and not the impl. Is it a fan out situation? Is it something where there is a director doing pipelining? are there competing SLA for different tasks?

ghadi19:05:17

there is a nice comment on the "please add closed?" ticket from @U053S2W0V > Please do not add a closed? predicate to the public API. It's a race condition in 99.9% of cases. Go does not provide such a predicate and after growing accustom to CSP via both core.async and Go, I have never felt the need for it. Having the predicate would invite its use. I enjoy clojure.core's preference for omission of questionable functions and expect the same from core.async. > If somebody desperately feels the need for this predicate, ask in IRC or Slack, I'm sure somebody can help you fix the structuring of your channels/processes/etc.

isak19:05:43

Do you agree with the suggestion above to reset! an atom at the end of the channel? If so, why would checking that atom lead to a more correct program than checking the internally maintained closed value here? Seems like it would just be the same thing with more code.

👍 1
ghadi19:05:42

that's one way to do something

ghadi19:05:40

you can swap an atom, or return info on a channel, or print to stdout, or push to SQS to publish information

ghadi19:05:58

but i don't know whether publishing information is the task at hand

ghadi20:05:26

if someone wants to wait on a process/value, <!! or alts is a great way

isak20:05:10

Right. I think it is publishing information, because he said "nothing" needs to happen when it is completed. I think he understands that you generally don't compose things that way.

ghadi20:05:16

if nothing needs to happen why even care?

ghadi20:05:47

something cares...

isak20:05:48

Well I think it isn't literally nothing, so I'm guessing it is publishing progress/status information. But still, doing it the atom way would accomplish the same thing, yet wouldn't be 'wrong'. Yet it would be functionally the same...

ghadi20:05:23

(my tone is intended to be friendly btw)

1
ghadi20:05:21

all I'm trying to say is I don't know, but before knowing I have to set aside a closed channel predicate because it's an impl to some higher level need

ghadi20:05:40

the absence of closed? in go and core.async is deliberate

isak20:05:37

Yea I understand. I'm interpreting Ferdinand as questioning whether that was a good decision, and I see his point for cases like this. Whether in general it still makes sense to not allow it because of all the other cases where people would tend to misuse it, I don't know, but I have some doubts. But probably nothing will happen, so I'll leave it there.

Ben Sless04:05:52

The interesting thing is that when you enqueue a read on a channel with <! you already get periodic polling via the runtime. Why not use that?

Ferdinand Beyer09:05:40

Thanks for all your replies! > It’s clear from your problem this is a when question, not an if question. The way to make things happen in sequence in CSP is clear I don’t agree @UK0810AQ2. Maybe it helps when I explain in more detail what situation I found myself in. We have a service running in Kubernetes. It starts a Kafka consumer in an async thread. There is some existing graceful shutdown logic to signal this consumer to stop, and wait for the thread to close, using alt!! and a timeout. The service also has a liveliness probe, e.g. Kubernetes will periodically call a web endpoint to check if everything is fine. Now we had an issue that this consumer “crashed” when an event handler threw an exception. The existing code would exit the async thread in this case. Now this puts our service in a defunct state, as it keeps running but stops consuming. A pragmatic improvement would be to check if the consumer is up periodically in the health check. When it stopped, just report the service unhealthy and restart it. So in the scope of the health check, this is an “if the thread is running”, not a “when it finishes”. Of course, the problem can be rephrased. And there are multiple other ways of dealing with this situation. HOWEVER, a super simple solution would be to just say: Healthy if all consumers are running. This can be done with impl.protocols/closed?. This can also be done by writing your own closed? using the public API (as suggested by @UCPGSBNQ4 here):

(defn closed? [ch]
  (alt!! ch true :default false))
Do you really think checking for the channel to be closed this way is bad, or achieving the exact same answer with different, more complex methods is inherently better? My point is: Channels being closed is not an implementation detail, it is a well-defined and documented property of a channel. So why hide it, or asking people to test indirectly in a more complicated way?

Ben Sless10:05:08

I think we're going to have to agree to disagree here, I think essentially, the there are several conceptual clashes - One is a mixture of synchronous behavior (channels, whens) with asynchronous behavior (polling) The other is a mixture of implementation details of several levels of abstraction; One is a channel being closed or open vs. a process running or not and its status The other is of components, their status and its monitoring, i.e. you need some internal monitor, or supervisor, and each process/component needs to have a way to send signals to that monitor, where you can do something like (go (if (bad? (<! thread)) (signal monitor))) and the monitor will manage and keep the health status of all components; Then when you get a health check, you just ask the monitor

Ferdinand Beyer10:05:28

OK to disagree 🙂 There are multiple ways to achieve this. Health checks usually use a “pull” approach, while you are suggesting a “push” approach. Both are valid, but I don’t see why we should patronize people. Abstraction: I think it is perfectly valid to say: A health check asks if consumers are healthy. This can be implemented by asking if a process is running. This in turn can be implemented by asking if a channel is closed, because this is what async/thread and async/go provide to you: A channel that will close when the async process exits.

Jan K11:05:59

I just want to point out that (alt!! ch true :default false) is NOT the same thing as checking if the channel is closed. Unlike the real closed? it will take a value if available and return true in that case even when the channel isn't actually closed. But for this context I think it's perfect, since it does what you need using basic CSP ops and avoids the need for the actual closed? function which doesn't fit CSP.

👍 1
ghadi12:05:33

Thanks for the context. At nubank we have Kafka consumers running in async/threads (with an internal retry loop), and our health check uses a 'pull' model from the outside, checking an atom

ghadi12:05:07

component ---has-many-> consumer thread

ghadi12:05:34

we have a healthchecking protocol, and the healthchecker doesn't know that the info is coming from an atom.

ghadi12:05:25

I would suggest pretending all the core async impl stuff doesn't exist, and try to model everything with plain, public Clojure.

Ferdinand Beyer12:05:27

@UCPGSBNQ4 you are right of course. What I dislike about that workaround is that eventually it will resort to impl/closed? internally, adding a lot of bloat that could be avoided by calling closed? directly. Also, it is not trivial to understand, and I also had to run it to convince myself it works, as the docs did not seem clear to me if a closed channel or :default would take precedence. I.e. if a take on a closed channel is considered “immediately ready”. So it adds a lot of complexity.

Ferdinand Beyer12:05:04

@U050ECB92 — Sure, and of course we are doing something similar. We don’t have the need for a protocol yet, but the health check will of course not check channels directly. It will delegate to something like kafka-consumers-running?. At some point though you need to map it to the actual implementation of the consumer loop. And yes, sure, one could use atoms. I am repeating myself here 😉

Ferdinand Beyer12:05:52

There’s more to it. In our case we had OutOfMemoryErrors that “killed” the consumer thread. So a solution that sets an atom would need to: • Create and manage an atom somewhere • Wrap code inside the thread in try-catch • finally reset/swap the atom That’s not terribly difficult, but there is some room for errors (e.g. catching Exception instead of everything / using finally) and what you end up with is just the same as checking the channel, with more code.

ghadi13:05:19

our impl also has a mechanism to pause consumers from the outside

ghadi13:05:40

and a circuit breaker in case exceptions are persistent

ghadi13:05:55

so IIRC the 'state' atom isn't a simple flag

Ferdinand Beyer13:05:32

All correct and good and right. But. You don’t always need the “perfect” solution.

Ferdinand Beyer13:05:02

In my case we had a defunct service and I wanted to add a pragmatic simple solution without having to refactor a lot of code

Ferdinand Beyer13:05:49

Killing pods with Kubernetes healthchecks is certainly less elegant than circuit breakers / retries in code, but they do work

Ferdinand Beyer13:05:10

And I got this bandaid on within minutes 🤷

Ferdinand Beyer13:05:26

Simple needs -> simple solutions More elaborate needs -> more elaborate solutions?

Ben Sless13:05:23

I've been imbibing too many Joe Armstrong talks recently and they've been percolating in my head together with Language Of The System. I'm starting to see the absence of simple systems everywhere, what Erlang gets via OTP behaviors such as gen_server