Hi all, is this an acceptable place to give feedback on c.a.flow? If so:
• flow monitor's UI can't handle processes with large state. I've got one process that batches up items for a later process, and a batch can exceed 1000 items. The UI for that process expands to fill the width of the window and performance slows dramatically
• Stopping a flow fails if a process is blocking. More specifically, what I've seen is that flow/stop returns without an error, but processes are not actually stopped.
• Error messages from timed out futures (for compute workload processes) are not helpful. Inspecting the full value put into the :error-chan gives more clues, but seems like wrapping the future in a try/catch and rethrowing with ex-info might make timeouts easier to diagnose?
Also, I asked a similar question in ask.clojure but didn't get an answer, so I'll ask a simpler question here:
If an i/o process's transform runs for a very long time is that bad? If I need to kick off and wait for a long task from the transform arity, what should I do instead of just blocking until it returns?
yes, good place to share feedback, thx! • /cc @jarrodctaylor on the flow-monitor question, we are looking at ways to set filters on the flow state in general, probably also need more ways to control visualization in the monitor • is the problem that stop returns before processes are stopped or that processes don't stop at all? • can you share more about what you're talking about? • I haven't gotten to your ask yet, sorry. but I don't think long-running transforms are not a problem, the question is more with what ping should do in that case
Thanks, Alex.
Regarding #2, the problem I'm seeing is that the processes don't stop at all. I'll try to set up a repro for it when I get a chance.
Regarding #3, I had a :compute workload process that did too much in its transform arity, taking longer than the default 5 seconds, so the future in which flow runs :compute transform fns timed out. The logging I had set up for items from the :error-chan (as returned by flow/start) only printed the ::flow/ex key, which showed me only a stack trace that didn't include any of my code at all, only clojure namespaces. I was able to figure out what was going on by switching to logs of the full :error-chan value, but even then it wasn't immediately obvious. I think it could be immediately obvious if the future which wraps the transform fn was itself wrapped in a try/catch which would rethrow the timeout exception with an ex-info that indicates more precisely the issue (e.g. ":compute workload process transform timed out") and which process is responsible.
I was wrong about #2 - the problem was on my end.
Does anyone have a good example where they refactored something to use flow? Coming back to this idea months later I'm still not sure i understand when i would introduce clojure go channels between processes on a single machine inside a jvm, and so need something to manage the flow between them.
> I have seen core async effectively used to manage communication between systems separated by network Can you elaborate? I am actually curious how that was done to be effective. CSP specifically does not say much about network, as far as I know. Nor does core.async.
You can use it to grab data that arrives via a callback mechanism and put that data on a channel. But that’s done after the data has arrived over the network or other IO.
That's what I'm referring to. I'm just giving that as an example of how i have seen it used.
Doesn't sente model connections as ques?
i haven’t seen an example where, for example, back pressure, made sense between two functions on the same jvmImagine you are reading constantly from fast local disk, but the network (uplink) is variable - sometimes slow, sometimes fast, might even be unavailable sometimes. You are doing some pre-processing of the messages in the Clojure/JVM before sending out. You can use core.async to slow down that pre-processing if the network is unable to accept the already processed messages. Otherwise you’d be putting extra memory pressure on the machine for no reason when the network (uplink) is slow.
If you don’t have backpressure, you might even OOM in a very naive scenario, depending on how everything is coded. Proper use of core.async can prevent that from happening by reading/processing just enough data that can be sent out at any moment.
Purely hypothetical, but I think realistic scenario. Hope it helps illustrate the point.
Extra pressure because you're keeping more in memory then you need to?
Yes
You’ve pre-processed them; they need to be sent out, if they can’t be, either have to throw them away and try later (pre-process again, use CPU again, etc), or keep them somewhere.
So in this case flow helps manage the coordination between those two channels?
I don’t know too much about flow 🙂 But I am guessing it can.
(but I’ve been using core.async more or less since it was released a while back)
Thanks. What's your favorite example usage in your history,?
Favorite? Probably UI development and animations 😜 But I am definitely in the minority there, it’s an acquired taste. One concern people have is that the go macro ends up being quite a bit of compiled JS code in terms of size. Depends on whether that matters for your use case.
For servers and Clojure on the JVM, if you need to design a reliable system that is sensitive to backpressure: that’s probably a great use case (similar cases to what I wrote above).
If you’re specifically interested in flow, I assume you’ve seen this video/demo https://www.youtube.com/watch?v=lXFwf3O4BVY
You'd use it when you are building any kind of event handling/propagation.
Which means you likely have Kafka, SQS, RabbitMQ or something similar in your architecture.
If you don't, it's very rare you'd need it.
But I think the typical example is say, one process receives Kafka events, sends each one over to another process that does the parsing, sends it over to another process that routes it, sends it over to another process that extract/transforms, send it over to another process that publishes a new event or makes one or more API calls out of it
And that service that uses core.async to do all that might subscribe to many streams, or many events and process/handle them all concurrently at high scale.
If your question is “when I would introduce channels” I would start from the original talks + rationale https://www.youtube.com/watch?v=yJxFPoxqzWE
In terms of ClojureScript / UI dev, this is a less well-known demo, but I clearly remember it was one of my original “aha” moments, specifically for UI/frontend dev https://www.youtube.com/watch?v=AhxcGGeh5ho
I wouldn’t personally recommend jumping into flow without being comfortable with core.async by itself, and the fundamental CSP concepts.
I feel my question was significantly different. Misguided maybe, but different. I have seen core async effectively used to manage communication between systems separated by network, but i haven't seen an example where, for example, back pressure, made sense between two functions on the same jvm. My intuition, is this kind of functionally would, typically find it's way into something like a database. As in, it's an atypical optimization.
Queues can also be used for logical decoupling between subsystems, though I've not seen that very often.
> but i haven’t seen an example where, for example, back pressure, made sense between two functions on the same jvm We use it for a variety of things, but one of them is what I would classify as pipelining. We process SPARQL requests in our product, and basically the SPARQL request is parsed into an AST which eventually has a core.async pipeline attached to the various elements of the AST. Some of them are serialized, and some are parallel, but we are processing multiple triples at the same time. In our case, it’s not about the “backpressure” per se (though that does come into play). Its about the processing model: We basically attach a go routine to every major element of the AST graph and information flows through, and core.async’s model means its flowing through with maximum concurrency.
(If I wrote it today, I would likely have used flow, but this work predates flow, so it just builds a given pipeline using raw core.async primitives
@ghaskins what use queues there is you just doing parallel processing, why not one of the many tools aimed directly at that? I feel like In order for a queue to make sense it's because you want to control narrative around enqueing and deaueing in relation to its source and destination.
Pre-java virtual threads, go blocks were kind of one of the only ways to have high level of concurrency with low memory and context switch footprint on the JVM.