Fork me on GitHub
#onyx
<
2017-07-18
>
maxk09:07:07

Hey everyone. Could you help me please with exception:

Lost and regained image with the same session-id and different correlationId

maxk09:07:26

what it means?

michaeldrogalis14:07:06

@maxk That’s a message related to an implementation detail we use with error. Did you see that as a failed assertion, thrown exception, or just an info log line?

maxk14:07:44

17-07-18 08:44:16 Maxs-MacBook-Pro-2.local WARN [onyx.peer.task-lifecycle:141] - 
                                          java.lang.Thread.run                       Thread.java:  745
            java.util.concurrent.ThreadPoolExecutor$Worker.run           ThreadPoolExecutor.java:  617
             java.util.concurrent.ThreadPoolExecutor.runWorker           ThreadPoolExecutor.java: 1142
                                                           ...                                        
                             clojure.core.async/thread-call/fn                         async.clj:  439
             onyx.peer.task-lifecycle/start-task-lifecycle!/fn                task_lifecycle.clj:  939
                  onyx.peer.task-lifecycle/run-task-lifecycle!                task_lifecycle.clj:  466
                            onyx.peer.task-lifecycle/iteration                task_lifecycle.clj:  448
                onyx.peer.task-lifecycle.TaskStateMachine/exec                task_lifecycle.clj:  859
            onyx.peer.task-lifecycle/wrap-lifecycle-metrics/fn                task_lifecycle.clj:  886
                      onyx.peer.read-batch/read-function-batch                    read_batch.clj:   18
                          io.aeron.Subscription.controlledPoll                 Subscription.java:  214
                                 io.aeron.Image.controlledPoll                        Image.java:  304
               io.aeron.ControlledFragmentAssembler.onFragment  ControlledFragmentAssembler.java:  116
                                          clojure.core/ex-info                          core.clj: 4725
clojure.lang.ExceptionInfo: Lost and regained image with the same session-id and different correlationId.

michaeldrogalis15:07:15

@maxk Which version of Onyx and the plugins are you on?

michaeldrogalis15:07:42

Did your job restart after that trace was logged, or did it go down completely?

maxk15:07:15

go down completely. Here is list of onyx-related libs

[org.onyxplatform/onyx "0.10.0" :exclusions [org.slf4j/slf4j-nop]]
                 [org.onyxplatform/lib-onyx "0.10.0.0"]
                 [org.onyxplatform/onyx-kafka "0.10.0.0"]
                 [org.onyxplatform/onyx-seq "0.9.15.1-SNAPSHOT"]

michaeldrogalis15:07:05

Definitely a bug. You’re going to want to ditch the onyx-seq dependency since 0.9 is incompatible with 0.10

michaeldrogalis15:07:19

Ill chat with @lucasbradstreet today and we’ll go digging. Is there anything else you can send our way?

lucasbradstreet16:07:03

@maxk onyx-seq is now included in onyx core

lucasbradstreet16:07:33

Did the job run at all, or did this happen immediately?

maxk19:07:05

@lucasbradstreet job works fine for some time (~15 mins)

maxk19:07:38

@michaeldrogalis , will try tomorrow, thank you for your help

lucasbradstreet19:07:09

@maxk thanks. Is it reproducible? If you set a handle-exception lifecycle (http://www.onyxplatform.org/docs/cheat-sheet/latest/#lifecycle-calls/:lifecycle/handle-exception) on :all task, your job will stay up, but it’s pretty worrying if you’re able to make it happen in a reproducible way

lxsameer20:07:41

how do you guys build the documents of onyx ? ( guides under doc/user-guides directory )

lxsameer20:07:19

@michaeldrogalis I know that it's not that important but have you guys ever considered gitbook for documentation ? it supports asciidoc as well and EPub/pdf/Mobi out of the box

lucasbradstreet20:07:12

We used gitbook for a while. I think there were some details about it that made us move away, but I can’t remember what they were

michaeldrogalis20:07:17

We used to use it. Wasn’t as happy with it.

michaeldrogalis20:07:25

We needed more flexibility than it offered.

lxsameer20:07:40

good to know, I wanted to create a Mobi from the docs to read them on my kindle, but asciidoctor's epub/mobi plugin seems really strict

michaeldrogalis20:07:13

Ah. 😕 Trade-offs everywhere with the doc tools, I’m afraid.

lxsameer20:07:15

yeah, that's right 😞

eriktjacobsen22:07:32

Regarding suggestion of using triggers to stick global window states in an atom. Couple questions: 1. Could you explain how :aggregation/create-state-update runs? It seems to be executed for every single segment for a given window, so does that execute on a single thread on a dedicated peer and require every single segment sent to that peer, or does it run on whichever peer is processing a batch of segments... and if so, how does it handle race conditions. The example in the docs seems to assume that the apply-state-update has been run before the next invocation of the create-state-update, but if that operates on every single segment without batching, that seems incredibly slow so I think I'm missing something. 2. Does :aggregation/apply-state-update run on every peer? So if we put a side-effect like setting an atom, that would execute and be available on every peer? 3. It looks like the :aggregation/init should be deterministic and idempotent to keep the peers in sync as they each execute this, which means the init fn should not be pulling in data from a mutable store. However, if a job has to be killed and restarted, is there any way to restore state that was previously dumped into durable storage? I suppose I could have the create-state-update fn detect if the state is empty and reach out to grab the last snapshot from external store, but that doesn't feel like best method.

michaeldrogalis22:07:40

@lucasbradstreet is enjoying writing his answer since he just did the work to address all of these in the last few months for the 0.10 release 😛

lucasbradstreet22:07:45

1. it will be called on each incoming segment on the peer that actually receives the data. There are no problems with race conditions, as it will run in serial on that peer. Not sure why it would be slow without batching though. There’s a lot of prior work around routing the right segment to the right aggregation, and correct grouping, so batching all of the aggregation calls together won’t help all that much. There may be a few optimisations we could do there though.

lucasbradstreet22:07:42

2. Don’t put anything side effecting in the aggregations calls. Do it via a trigger. When we re-add a log backed durable state we will end up replaying the aggregations calls, and things will break.

lucasbradstreet22:07:18

3. The peers don’t have to be in “sync” as such, since the data is sharded out to the peers. You’re right that there will be consistency problems if the value is init’d from a mutable store though.

lucasbradstreet22:07:04

I haven’t thought about the patterns around 3 much. I’m open to thoughts there.

michaeldrogalis22:07:50

I think (3) could pretty effectively be pulled off with resume points.

michaeldrogalis22:07:24

I guess that might not be true — did we ever end up adding the ability for a resume point to look outside of ZooKeeper for its content?

lucasbradstreet22:07:29

Yeah, 3 would work ok if you knew what you would want to init things to at time of job start.

lucasbradstreet22:07:37

Not yet, but it’s on the cards, and we could move that up.

michaeldrogalis22:07:50

Heh, sorry. Yes, we talked about that one.

lucasbradstreet22:07:40

The idea with what @michaeldrogalis is saying with 3, is that resume points allow you to resume/copy state from other jobs when the jobs start up. Currently this has to be done from job checkpoints that are in S3/ZooKeeper, but we could just as easily allow values to be supplied via the job data, or other data sources to be supplied

lucasbradstreet22:07:42

RE: 1 it's possible that we could optimise it via batching somewhat but it's basically a reducer which ends up generally processing things one at a time anyway

eriktjacobsen22:07:56

Thanks for the answers! Followup: 1. I might be missing something fundamental here about the relationship between windows, peers, and functions associated with that window. Going off the sum aggregation example in the docs... if two peers are operating on two incoming segments [10 15], and they both receive a state of 49, then one peer's create-state-update would output [:set-value 59] and another peer's would output [:set-value 64]. You say it will run in serial on that peer, so I would think that means all segments go to a single peer, but you say it will run on the peer that receives the data (my assumption is therefore that multiple peers could be receiving different parts of the data). 2. Doesn't a trigger only run on a specific peer? So wouldn't that atom only be available on a given peer?

lucasbradstreet22:07:38

1. Right, so if you use a group-by, segments with a particular key will all end up on the same node, so you can aggregate their state together on the same peer. If you don't use hash routing then you will end up with segments randomly ending up on particular nodes. There is no coordination so if you have a single global aggregation then you're kinda screwed unless you use a single peer. There's no good way around this really, but you could use triggers to emit your reduced state downstream and merge it on a single peer if it's not too large.

lucasbradstreet22:07:56

2. Yes, I figured that since you were talking about atoms you were going to end up reducing on a single peer. Otherwise you are going to have to merge it yourself, either by accessing the atom on all of the peers and merging from somewhere else, or emitting downstream as I mention in 1

lucasbradstreet22:07:45

We are adding a feature called queryable state right now which would let you access the windowed state on each peer directly from another node. That could help here depending on your needs

lucasbradstreet22:07:59

This all really depends on how parallelisable your problem is. It sounds like there may be a number of dependencies that makes it pretty hard.

eriktjacobsen22:07:00

Ah, I see now. I mentioned was looking for distributed solution and "avout", which seems to be a distributed atom. If you assumed single peer, that makes rest of it make sense. I didn't realize that window aggregation state was specific to each peer, i thought other mechanisms were at play to create a single state for a global window.

lucasbradstreet22:07:22

Right, yeah. Distributed atom = really bad perf, which is fine for a lot of things, but not really for a stream processor

lucasbradstreet22:07:43

If you need that kind of thing, and it needs to be fast you need to think about how you structure/split it up.

eriktjacobsen22:07:37

Thanks, clears a lot up. Will think things through again