missionary

2026-04-13T17:39:21.530799Z

I’m trying to understand the coupled/decoupled distinction mentioned at https://clojurians.slack.com/archives/CL85MBPEF/p1710088061237329. See in thread for details.

2026-04-13T17:39:37.056709Z

If an uninitialized discrete flow is decoupled because it’s writable, shouldn’t a coupled discrete flow become decoupled when a value is read? And with m/buffer, shouldn’t a buffered discrete flow be considered decoupled unless the buffer is full? I guess I’m asking whether the coupled/decoupled distinction is useful beyond what the discrete/continuous distinction already captures.

leonoel 2026-04-13T19:06:35.608289Z

Decoupling essentially means the producer behavior is not impacted by the consumption rate. If a discrete flow is buffered, but the buffer never gets full in practice, it is practically decoupled. Is it a useful concept ? Probably yes, a decoupled data source is easier to define as it's essentially push-driven (e.g. m/observe). A coupled data source must implement flow control somehow.

leonoel 2026-04-13T19:07:11.580389Z

> If an uninitialized discrete flow is decoupled because it’s writable, shouldn’t a coupled discrete flow become decoupled when a value is read? this I don't understand

2026-04-15T14:48:16.336639Z

I’m mulling things over and things are becoming clearer. I have a couple more questions… 1. When you talk about a flow being “initialized” or “uninitialized”, is that referring to the state of the flow when it’s created (so it’s a property that doesn’t change when the flow gets its first value)? Or does a flow change from being “uninitialized” to “initialized” when it gets its first value, like a variable in a programming language? 2. You said this for m/reductions: if you pass a decoupled flow you get a continuous flow, if you pass a coupled flow you get a discrete flow (so the result could be either continuous or discrete). But the docstring for m/reductions says “Returns a discrete flow running given discrete flow”. Is the docstring incorrect?

2026-04-15T16:01:08.761699Z

Ah; I just realised that the second question is answered by the thread I linked to at the start of this thread.

leonoel 2026-04-15T16:20:46.798329Z

1- the former. If you pass an uninitialized flow to a continuous operator like m/latest, it will check that it's ready to transfer on boot and crash immediately

👍 1
2026-04-14T09:19:43.790399Z

For the bit you didn’t understand… that was me misunderstanding some of the thread I linked to, so please ignore it. The “practically decoupled” bit helped. Are the following statements correct? 1. Some operators create discrete flows. 2. Some operators create continuous flows. 3. No operator creates both discrete flows and continuous flows. 4. Discrete flows are coupled, and produce backpressure when they are full. (Here, “full” means “has a pending value” for an unbuffered flow or “buffer is full” for a buffered flow.)

2026-04-14T09:20:03.418309Z

I’m puzzled by your mention of m/observe when talking about decoupled flows. Doesn’t m/observe create a discrete, coupled flow?

leonoel 2026-04-14T10:15:03.225119Z

1&2 yes 3 no - in fact most operators propagate backpressure and therefore preserve coupling, i.e. coupled in -> coupled out, decoupled in -> decoupled out. Example : m/reductions - if you pass a decoupled flow you get a continuous flow, if you pass a coupled flow you get a discrete flow. 4 no - coupled flows are discrete, opposite is not true. m/observe is decoupled but discrete (because uninitialized). The reason why it is decoupled is because there is no way for the consumer to slow the rate down, the consumer is supposed to be always ready to process. It is incorrect to use it on a slow consumer, that's why we provide a backpressure strategy in this case (e.g. m/relieve or m/buffer, which are effectively fast consumers)

🤔 1
leonoel 2026-04-14T12:11:23.521959Z

I now realize that it may be more correct to say that m/observe is meant to be decoupled, rather than m/observe returns a decoupled flow. Strictly speaking m/observe behavior is impacted by the consumption rate in the sense that a slow consumer generates runtime errors, but this should never happen in a correct program.

leonoel 2026-04-14T12:22:35.433959Z

I also realize that you may have been confused by the current JVM implementation of m/observe, which blocks the calling thread if the consumer is not ready. This was a failed experiment, I do not plan to keep this behavior, to my current understanding the right behavior is to throw an exception in this case.

2026-04-14T14:17:04.105009Z

Thanks. I need to mull this over. I may well be back with further questions. 🙂

leonoel 2026-04-14T14:23:22.763369Z

Your questions are welcome. I'm still in the process of formalizing these concepts properly, I'm happy to be challenged.

👍 1
2026-04-17T12:33:17.213339Z

Is it true to say that initialized/uninitialized and coupled/decoupled are fundamental properties, and discrete/continuous is a consequence of the other two? If not, is there some similar, but true, statement?

leonoel 2026-04-17T12:37:51.485499Z

This is an equivalence relation. continuous is defined as initialized + decoupled, discrete is defined as not continuous.

2026-04-17T12:40:23.180509Z

Ah, right. Thanks for all your answers in this thread — things are much clearer to me now.

👍 1
awb99 2026-06-23T04:17:18.428159Z

@leonoel I am worried of the idea of m/observe throwing exceptions. So when a consumer is a m/ap that does even the slightest synchronous calculation inside the m/ap block then this could risk throwing exceptions in the source of the flow as soon as the consumer has not finished processing before the next value comes along Blocking until the value can be be pushed downstream seems like a sensible thing to do. If m/observe would throw exceptions; then either this exception has to be catched and then retried. Or there need to be buffers everywhere. The. Only way where I think an exception makes sense is that it allows to have a send timeout.

leonoel 2026-06-23T08:58:36.598049Z

If the calculation is truly synchronous (i.e. not a task, or a task completing immediately), the transfer is also synchronous, therefore it's not a backpressure error, no exception should be thrown.

leonoel 2026-06-23T08:59:36.513429Z

> The only way where I think an exception makes sense is that it allows to have a send timeout. Could you explain the pattern ?

awb99 2026-06-23T15:18:32.858299Z

The first transfer is syncronous. Do you mean that the second transfer would only happen after the synchronous part has already finished? In which case there is no exception. But if the consumer is a m/ap that has m/? inside. THEN the consumer might not be ready when the next value is ready in the producer. So then the producer would get the exception. So when I catch this exception then I could decide to wait some time and then retry. If there is no consumer after some time, I could then give up. I am not sure this is the right pattern though. Because then each time observe gets used there are more edge cases that need to be thought of.

leonoel 2026-06-23T15:54:17.654629Z

yes, the synchronous part will be run completely during the first callback execution

leonoel 2026-06-23T15:55:20.543479Z

and yes, you could detect the exception as a polling mechanism and I agree it's a dubious pattern