This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2017-07-18
Channels
- # aleph (12)
- # beginners (31)
- # boot (67)
- # cider (17)
- # cljs-dev (14)
- # clojure (111)
- # clojure-dev (1)
- # clojure-france (4)
- # clojure-gamedev (1)
- # clojure-italy (49)
- # clojure-nl (3)
- # clojure-poland (2)
- # clojure-russia (18)
- # clojure-spec (15)
- # clojure-uk (68)
- # clojurescript (33)
- # core-typed (1)
- # datomic (15)
- # emacs (3)
- # graphql (4)
- # hoplon (36)
- # leiningen (3)
- # lumo (44)
- # mount (2)
- # off-topic (46)
- # om (21)
- # onyx (47)
- # parinfer (22)
- # pedestal (21)
- # protorepl (4)
- # quil (4)
- # re-frame (15)
- # reagent (4)
- # ring-swagger (9)
- # rum (27)
- # spacemacs (11)
- # vim (7)
- # yada (8)
Hey everyone. Could you help me please with exception:
Lost and regained image with the same session-id and different correlationId
@maxk That’s a message related to an implementation detail we use with error. Did you see that as a failed assertion, thrown exception, or just an info log line?
17-07-18 08:44:16 Maxs-MacBook-Pro-2.local WARN [onyx.peer.task-lifecycle:141] -
java.lang.Thread.run Thread.java: 745
java.util.concurrent.ThreadPoolExecutor$Worker.run ThreadPoolExecutor.java: 617
java.util.concurrent.ThreadPoolExecutor.runWorker ThreadPoolExecutor.java: 1142
...
clojure.core.async/thread-call/fn async.clj: 439
onyx.peer.task-lifecycle/start-task-lifecycle!/fn task_lifecycle.clj: 939
onyx.peer.task-lifecycle/run-task-lifecycle! task_lifecycle.clj: 466
onyx.peer.task-lifecycle/iteration task_lifecycle.clj: 448
onyx.peer.task-lifecycle.TaskStateMachine/exec task_lifecycle.clj: 859
onyx.peer.task-lifecycle/wrap-lifecycle-metrics/fn task_lifecycle.clj: 886
onyx.peer.read-batch/read-function-batch read_batch.clj: 18
io.aeron.Subscription.controlledPoll Subscription.java: 214
io.aeron.Image.controlledPoll Image.java: 304
io.aeron.ControlledFragmentAssembler.onFragment ControlledFragmentAssembler.java: 116
clojure.core/ex-info core.clj: 4725
clojure.lang.ExceptionInfo: Lost and regained image with the same session-id and different correlationId.
@maxk Which version of Onyx and the plugins are you on?
Did your job restart after that trace was logged, or did it go down completely?
go down completely. Here is list of onyx-related libs
[org.onyxplatform/onyx "0.10.0" :exclusions [org.slf4j/slf4j-nop]]
[org.onyxplatform/lib-onyx "0.10.0.0"]
[org.onyxplatform/onyx-kafka "0.10.0.0"]
[org.onyxplatform/onyx-seq "0.9.15.1-SNAPSHOT"]
Definitely a bug. You’re going to want to ditch the onyx-seq dependency since 0.9 is incompatible with 0.10
Ill chat with @lucasbradstreet today and we’ll go digging. Is there anything else you can send our way?
@maxk onyx-seq is now included in onyx core
Did the job run at all, or did this happen immediately?
@lucasbradstreet job works fine for some time (~15 mins)
@michaeldrogalis , will try tomorrow, thank you for your help
@maxk thanks. Is it reproducible? If you set a handle-exception lifecycle (http://www.onyxplatform.org/docs/cheat-sheet/latest/#lifecycle-calls/:lifecycle/handle-exception) on :all
task, your job will stay up, but it’s pretty worrying if you’re able to make it happen in a reproducible way
how do you guys build the documents of onyx ? ( guides under doc/user-guides
directory )
@lxsameer https://github.com/onyx-platform/onyx-platform.github.io/blob/master/build-site.sh#L31
@michaeldrogalis I know that it's not that important but have you guys ever considered gitbook
for documentation ? it supports asciidoc as well and EPub/pdf/Mobi out of the box
We used gitbook for a while. I think there were some details about it that made us move away, but I can’t remember what they were
We used to use it. Wasn’t as happy with it.
We needed more flexibility than it offered.
good to know, I wanted to create a Mobi from the docs to read them on my kindle, but asciidoctor's epub/mobi plugin seems really strict
Ah. 😕 Trade-offs everywhere with the doc tools, I’m afraid.
Regarding suggestion of using triggers to stick global window states in an atom. Couple questions:
1. Could you explain how :aggregation/create-state-update
runs? It seems to be executed for every single segment for a given window, so does that execute on a single thread on a dedicated peer and require every single segment sent to that peer, or does it run on whichever peer is processing a batch of segments... and if so, how does it handle race conditions. The example in the docs seems to assume that the apply-state-update
has been run before the next invocation of the create-state-update
, but if that operates on every single segment without batching, that seems incredibly slow so I think I'm missing something.
2. Does :aggregation/apply-state-update
run on every peer? So if we put a side-effect like setting an atom, that would execute and be available on every peer?
3. It looks like the :aggregation/init
should be deterministic and idempotent to keep the peers in sync as they each execute this, which means the init fn should not be pulling in data from a mutable store. However, if a job has to be killed and restarted, is there any way to restore state that was previously dumped into durable storage? I suppose I could have the create-state-update
fn detect if the state is empty and reach out to grab the last snapshot from external store, but that doesn't feel like best method.
@lucasbradstreet is enjoying writing his answer since he just did the work to address all of these in the last few months for the 0.10 release 😛
1. it will be called on each incoming segment on the peer that actually receives the data. There are no problems with race conditions, as it will run in serial on that peer. Not sure why it would be slow without batching though. There’s a lot of prior work around routing the right segment to the right aggregation, and correct grouping, so batching all of the aggregation calls together won’t help all that much. There may be a few optimisations we could do there though.
2. Don’t put anything side effecting in the aggregations calls. Do it via a trigger. When we re-add a log backed durable state we will end up replaying the aggregations calls, and things will break.
3. The peers don’t have to be in “sync” as such, since the data is sharded out to the peers. You’re right that there will be consistency problems if the value is init’d from a mutable store though.
I haven’t thought about the patterns around 3 much. I’m open to thoughts there.
I think (3) could pretty effectively be pulled off with resume points.
I guess that might not be true — did we ever end up adding the ability for a resume point to look outside of ZooKeeper for its content?
Yeah, 3 would work ok if you knew what you would want to init things to at time of job start.
Not yet, but it’s on the cards, and we could move that up.
Heh, sorry. Yes, we talked about that one.
The idea with what @michaeldrogalis is saying with 3, is that resume points allow you to resume/copy state from other jobs when the jobs start up. Currently this has to be done from job checkpoints that are in S3/ZooKeeper, but we could just as easily allow values to be supplied via the job data, or other data sources to be supplied
RE: 1 it's possible that we could optimise it via batching somewhat but it's basically a reducer which ends up generally processing things one at a time anyway
Thanks for the answers! Followup:
1. I might be missing something fundamental here about the relationship between windows, peers, and functions associated with that window. Going off the sum aggregation example in the docs... if two peers are operating on two incoming segments [10 15]
, and they both receive a state of 49
, then one peer's create-state-update
would output [:set-value 59]
and another peer's would output [:set-value 64]
. You say it will run in serial on that peer, so I would think that means all segments go to a single peer, but you say it will run on the peer that receives the data (my assumption is therefore that multiple peers could be receiving different parts of the data).
2. Doesn't a trigger only run on a specific peer? So wouldn't that atom only be available on a given peer?
1. Right, so if you use a group-by, segments with a particular key will all end up on the same node, so you can aggregate their state together on the same peer. If you don't use hash routing then you will end up with segments randomly ending up on particular nodes. There is no coordination so if you have a single global aggregation then you're kinda screwed unless you use a single peer. There's no good way around this really, but you could use triggers to emit your reduced state downstream and merge it on a single peer if it's not too large.
2. Yes, I figured that since you were talking about atoms you were going to end up reducing on a single peer. Otherwise you are going to have to merge it yourself, either by accessing the atom on all of the peers and merging from somewhere else, or emitting downstream as I mention in 1
We are adding a feature called queryable state right now which would let you access the windowed state on each peer directly from another node. That could help here depending on your needs
This all really depends on how parallelisable your problem is. It sounds like there may be a number of dependencies that makes it pretty hard.
Ah, I see now. I mentioned was looking for distributed solution and "avout", which seems to be a distributed atom. If you assumed single peer, that makes rest of it make sense. I didn't realize that window aggregation state was specific to each peer, i thought other mechanisms were at play to create a single state for a global window.
Right, yeah. Distributed atom = really bad perf, which is fine for a lot of things, but not really for a stream processor
If you need that kind of thing, and it needs to be fast you need to think about how you structure/split it up.
Thanks, clears a lot up. Will think things through again
Sure thing