Fork me on GitHub
#onyx
<
2016-06-06
>
michaeldrogalis04:06:13

All other changes for 0.9.7 are staged, should be ready to roll as soon as we clean up that last patch.

zamaterian06:06:13

Any hints on how I shutdown a peer when the muk.co.real_logic.aeron.exceptions.ConductorServiceTimeoutException occurs ? Since the exeption will just continue to be thrown and is quickly filling up our logs.

lucasbradstreet07:06:11

Hi @zamaterian. That is a difficult condition to resolve since I believe it'll be due to the conductor operating badly, in which case you possibly need to reboot both the aeron media driver and the peer

lucasbradstreet07:06:45

Are you seeing any errors out of the media driver? I'll see if we can get the media driver rebooting under such conditions.

zamaterian07:06:05

It mainly happens because of resource starvation, eg when running a peer with docker where the cpus and memory allocated is too small. I just started looking into timbre middleware for filtering out the repeated log statements. I only see a few errors in the aeron log uk.co.real_logic.aeron.driver.exceptions.ControlProtocolException. It would be fine if the peer just shutdown. Then we could start it again.

lucasbradstreet07:06:07

Okay, yeah, there are a few things to solve here

lucasbradstreet07:06:37

The repeated exceptions are definitely a problem for this case because they will definitely be emitted all the time in this sort of situation

lucasbradstreet07:06:28

this will be improved greatly with the next version’s messaging model, however it might be a while before it’s out. So we may need to patch around the error for now

lucasbradstreet08:06:02

@zamaterian: what is the exact error coming out of the peer? I’m a bit surprised that you might be getting ControlProtocolException out of the peers. Are you using the embedded driver?

zamaterian09:06:42

I’m got ControlProtocolException a single time from the aeron log. The same setup as the onyx-template with seperate jvms for onyx and aeron

lucasbradstreet09:06:28

Maybe you could send me a copy of the logs after you take anything private out of them?

gardnervickers12:06:27

@lucasbradstreet: good to see redis offering something in that space!

joshg16:06:10

Can :window/init and :trigger/sync be used to persist window state across different jobs (by loading and saving the window state to a persistent store)?

lucasbradstreet16:06:11

@joshg: yes, I believe this would work well. You would have to ensure that you persist / restore for the right task “slots” (their number) as well as the right task name.

lucasbradstreet16:06:42

You could check the trigger event-type :job-killed from :trigger/sync to ensure it’s only persisted when you actually kill the job

lucasbradstreet16:06:26

Hmm. :window/init might not be a good fit though

joshg16:06:04

Would that not work with :window/aggregation :onyx.windowing.aggregation/conj?

joshg16:06:26

Or is there another way to initialize a window?

lucasbradstreet16:06:57

The problem with using window/init is that you might have multiple windows over your window-state, and those windows are only initialised when they have a reason to be

joshg16:06:34

I see, and window/init doesn’t give you enough context to know which window you’re initializing?

lucasbradstreet16:06:44

Yeah, that’s right

joshg16:06:57

Hmm. Is there another way to initialize a window? Maybe by using a special message on job initialization or something?

lucasbradstreet16:06:11

It’s tough. I think all of the solutions I would suggest would end up being harder than adding the feature once and for all

joshg16:06:35

OK, and that’s 8–10 weeks out?

lucasbradstreet16:06:07

Something like that. I’d love to do it now, but it’ll be easier when some of our future changes land, and we’re a bit busy for it now unless it’s sponsored work

lucasbradstreet16:06:29

I could point you in the right direction to try it yourself though. It may not end up being all that hard

joshg16:06:58

What do you think it would take (if I was to look at it)?

lucasbradstreet16:06:35

I think the key would be to allow https://github.com/onyx-platform/onyx/blob/0.9.x/src/onyx/state/log/bookkeeper.clj#L220 to be supplied with the previous job’s task-id for that task name

lucasbradstreet16:06:55

Once that’s in place, it should just play back from the previous job

lucasbradstreet16:06:42

Getting the feature right is tricky though, because you need to worry about it then playing back from the new job’s checkpoints.

lucasbradstreet16:06:05

It’s a simple but tough feature really

lucasbradstreet16:06:39

I’ll have a think about it. There may be an easier way

joshg16:06:42

And that previous job’s task ID would be supplied in the configuration? Would it be easier (and perhaps more general) to add more context to :window/init and make it the responsibility of the user to initialize the window?

lucasbradstreet16:06:12

The problem is that we’d also need to inject which windows were around previously

lucasbradstreet16:06:41

So we start needing to restore some additional state. I guess we could have an initial-windows feature, or something like that

joshg16:06:04

You’re referring to refinement mode windows?

joshg16:06:16

Where one needs the previous windows?

lucasbradstreet16:06:08

Well, state windows, which are used by the window aggregations (reducers), refinements, and triggers

lucasbradstreet16:06:48

You need some way to initialise what windows are there when the task start up each time, to seed the windows, then playback the window log on top of those

lucasbradstreet16:06:20

These are all seemingly simple but get a bit complex when you start accounting for restoration of the remaining state for running jobs

lucasbradstreet16:06:52

Besides, I wouldn’t want to rely on :job-killed to persist your window states because you have no guarantee that the call will be successfully made if things go down

lucasbradstreet16:06:02

Much better to rely on the existing fault tolerance mechanisms

lucasbradstreet16:06:46

I’ll have a think about how we can just restore the saved state from the previous jobs. I think everyone will be happier. I think I may have an easy solution, but these things can be deceptive, so I will have to mull it over a bit

joshg16:06:23

It seems difficult to restore the state of previous jobs. Would that work if a new job was submitted with additional tasks?

lucasbradstreet16:06:27

It should be OK, you could just have it restore the state of the remaining tasks. The only thing that gets complex is if your two jobs have different number of n-peers/max-peers on the same tasks

lucasbradstreet16:06:54

This is just because of the way that segments get hash-routed, as they would get hash-routed to different slots between the jobs

joshg17:06:56

Yeah, I see how that can be tricky. It could be an initial requirement that those parameters cannot change.

lucasbradstreet17:06:56

Yeah, I think that would be an OK initial constraint

joshg18:06:22

> A current limitation is that the parallelism of your program has to match the parallelism of the program from which the savepoint was drawn. In future versions of Flink, there will be support to re-partition the state of programs both during runtime and savepoint restores.

lucasbradstreet18:06:21

Yeah. I think it's reasonable. Like Kafka partitions, the recommendation would be to overprovision the peers on the grouping tasks a bit, so you have room to grow

lucasbradstreet18:06:24

My idea still feels pretty sound. If I have half an hour spare in the next few days I may give it a go, but no promises. I'll only really have time if it turns out to be fairly trivial. It may end up being so.

michaeldrogalis18:06:25

We'll send out a link in an email shortly. Here's the Webinar live stream link for this Thursday if you want to save it now. https://www.youtube.com/watch?v=5eEKZa2DSJI

joshg18:06:01

@lucasbradstreet: Cool, thanks. I'll take a look at it as well. Thanks for pointing me in the right direction.

joshg18:06:24

The overprovisioning constraint seems completely reasonable. As you said, we already have that requirement with message keys and Kafka topic partitions.

joshg19:06:17

@lucasbradstreet: Flink allows you to specify a task UID that is used to restore the task’s state across jobs: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html#changes-to-your-program

joshg19:06:36

Would it make sense to add that key to a task’s window definition?

joshg19:06:14

In this way, you would explicitly be opting-in to persisting task state across jobs.

lucasbradstreet20:06:20

That seems like it could be a good solution. It's somewhat along the lines of what I was thinking

lucasbradstreet20:06:18

One problem is that the window logs for a task are currently bundled together when they're written to bookkeeper, so it'd be easy to have a custom task save uid, but having a custom window one would be harder

lucasbradstreet20:06:27

That part isn't ideal, and maybe should be improved, but it does make it harder to fix the issue until we get the new state implementation done

joshg20:06:46

I think that’s OK, the UID could be for the task rather than the window.

joshg20:06:09

Wow, I just noticed how early it was for you! Sorry!

michaeldrogalis20:06:46

Standard Lucas hours.

joshg23:06:24

Do the tests have external dependencies? The onyx.windowing.aggregation-average-test is hanging on Starting Onyx test environment.