Fork me on GitHub

All other changes for 0.9.7 are staged, should be ready to roll as soon as we clean up that last patch.


Any hints on how I shutdown a peer when the occurs ? Since the exeption will just continue to be thrown and is quickly filling up our logs.


Hi @zamaterian. That is a difficult condition to resolve since I believe it'll be due to the conductor operating badly, in which case you possibly need to reboot both the aeron media driver and the peer


Are you seeing any errors out of the media driver? I'll see if we can get the media driver rebooting under such conditions.


It mainly happens because of resource starvation, eg when running a peer with docker where the cpus and memory allocated is too small. I just started looking into timbre middleware for filtering out the repeated log statements. I only see a few errors in the aeron log It would be fine if the peer just shutdown. Then we could start it again.


Okay, yeah, there are a few things to solve here


The repeated exceptions are definitely a problem for this case because they will definitely be emitted all the time in this sort of situation


this will be improved greatly with the next version’s messaging model, however it might be a while before it’s out. So we may need to patch around the error for now


@zamaterian: what is the exact error coming out of the peer? I’m a bit surprised that you might be getting ControlProtocolException out of the peers. Are you using the embedded driver?


I’m got ControlProtocolException a single time from the aeron log. The same setup as the onyx-template with seperate jvms for onyx and aeron


Maybe you could send me a copy of the logs after you take anything private out of them?


@lucasbradstreet: good to see redis offering something in that space!


Can :window/init and :trigger/sync be used to persist window state across different jobs (by loading and saving the window state to a persistent store)?


@joshg: yes, I believe this would work well. You would have to ensure that you persist / restore for the right task “slots” (their number) as well as the right task name.


You could check the trigger event-type :job-killed from :trigger/sync to ensure it’s only persisted when you actually kill the job


Hmm. :window/init might not be a good fit though


Would that not work with :window/aggregation :onyx.windowing.aggregation/conj?


Or is there another way to initialize a window?


The problem with using window/init is that you might have multiple windows over your window-state, and those windows are only initialised when they have a reason to be


I see, and window/init doesn’t give you enough context to know which window you’re initializing?


Yeah, that’s right


Hmm. Is there another way to initialize a window? Maybe by using a special message on job initialization or something?


It’s tough. I think all of the solutions I would suggest would end up being harder than adding the feature once and for all


OK, and that’s 8–10 weeks out?


Something like that. I’d love to do it now, but it’ll be easier when some of our future changes land, and we’re a bit busy for it now unless it’s sponsored work


I could point you in the right direction to try it yourself though. It may not end up being all that hard


What do you think it would take (if I was to look at it)?


I think the key would be to allow to be supplied with the previous job’s task-id for that task name


Once that’s in place, it should just play back from the previous job


Getting the feature right is tricky though, because you need to worry about it then playing back from the new job’s checkpoints.


It’s a simple but tough feature really


I’ll have a think about it. There may be an easier way


And that previous job’s task ID would be supplied in the configuration? Would it be easier (and perhaps more general) to add more context to :window/init and make it the responsibility of the user to initialize the window?


The problem is that we’d also need to inject which windows were around previously


So we start needing to restore some additional state. I guess we could have an initial-windows feature, or something like that


You’re referring to refinement mode windows?


Where one needs the previous windows?


Well, state windows, which are used by the window aggregations (reducers), refinements, and triggers


You need some way to initialise what windows are there when the task start up each time, to seed the windows, then playback the window log on top of those


These are all seemingly simple but get a bit complex when you start accounting for restoration of the remaining state for running jobs


Besides, I wouldn’t want to rely on :job-killed to persist your window states because you have no guarantee that the call will be successfully made if things go down


Much better to rely on the existing fault tolerance mechanisms


I’ll have a think about how we can just restore the saved state from the previous jobs. I think everyone will be happier. I think I may have an easy solution, but these things can be deceptive, so I will have to mull it over a bit


It seems difficult to restore the state of previous jobs. Would that work if a new job was submitted with additional tasks?


It should be OK, you could just have it restore the state of the remaining tasks. The only thing that gets complex is if your two jobs have different number of n-peers/max-peers on the same tasks


This is just because of the way that segments get hash-routed, as they would get hash-routed to different slots between the jobs


Yeah, I see how that can be tricky. It could be an initial requirement that those parameters cannot change.


Yeah, I think that would be an OK initial constraint


> A current limitation is that the parallelism of your program has to match the parallelism of the program from which the savepoint was drawn. In future versions of Flink, there will be support to re-partition the state of programs both during runtime and savepoint restores.


Yeah. I think it's reasonable. Like Kafka partitions, the recommendation would be to overprovision the peers on the grouping tasks a bit, so you have room to grow


My idea still feels pretty sound. If I have half an hour spare in the next few days I may give it a go, but no promises. I'll only really have time if it turns out to be fairly trivial. It may end up being so.


We'll send out a link in an email shortly. Here's the Webinar live stream link for this Thursday if you want to save it now.


@lucasbradstreet: Cool, thanks. I'll take a look at it as well. Thanks for pointing me in the right direction.


The overprovisioning constraint seems completely reasonable. As you said, we already have that requirement with message keys and Kafka topic partitions.


@lucasbradstreet: Flink allows you to specify a task UID that is used to restore the task’s state across jobs:


Would it make sense to add that key to a task’s window definition?


In this way, you would explicitly be opting-in to persisting task state across jobs.


That seems like it could be a good solution. It's somewhat along the lines of what I was thinking


One problem is that the window logs for a task are currently bundled together when they're written to bookkeeper, so it'd be easy to have a custom task save uid, but having a custom window one would be harder


That part isn't ideal, and maybe should be improved, but it does make it harder to fix the issue until we get the new state implementation done


I think that’s OK, the UID could be for the task rather than the window.


Wow, I just noticed how early it was for you! Sorry!


Standard Lucas hours.


Do the tests have external dependencies? The onyx.windowing.aggregation-average-test is hanging on Starting Onyx test environment.