Fork me on GitHub

I'm trying to understand how Onyx handles loss of individual nodes or of the entire cluster, and options for restoring from snapshots. Context is pure calculations (Kafka input, Kafka output, stateful processors, no arbitrary side-effects). Replaying some messages is fine, because state updates can be made idempotent, but I suspect that replaying all messages would cause too much downtime and inefficiency. The LambdaJam video gives an example of a worker node disconnecting (11:20) and losing a message, and having that message automatically reprocessed after the lost node is restored. It also gives the example of a root node disconnecting (12:00), then picking up where it left off because the input queue remembers which messages were not acknowledged yet. I'm under the impression that Kafka is considered a good input source for stream processing, but that it does not provide any such message acknowledgement capability. Storing Kafka offsets is insufficient because when messages are distributed for parallel processing across multiple worker nodes the messages will not be processed in order. Is it still possible to recover after a root node disconnects? Also, the failures in the video involved individual nodes. If there is a cluster-wide outage, does Onyx have a "best practice" for restoring from snapshots and minimizing the need for replay? It sounds like snapshotting the "acking daemon" might help with this, but that sounds like an Onyx implementation detail that I might not have access to. Are there documentation or examples that I should read to understand these issues?


@ericlavigne: Onyx has no opinion about the underlying storage medium’s implementation details. In the case of Kafka, consumer offsets are global and do not depend on consumer count, it is simply a location in the log of messages. Therefore when a peer fails, or all peers fail, reading can begin again from the last recorded offset.


That would make sense to me if messages were processed one at a time. But there could be 10 peers working on messages from the same topic/partition at the same time. Last recorded offset may be later than earliest missed message.


And Onyx may have read far ahead of what it passed to peers so far, right?


Just to clarify, offsets are only recorded once a message has competed it’s flow from input -> output. Sorry if that’s redundant just trying to clarify your meaning of “later"


So if message 1 and message 2 both are pulled from input, and message 2 is fully processed, but message 1 is not fully processed, then the "committed" offset doesn't advance at all?


if message 1 cannot trigger an ack (hit the end of the workflow) then it will be retried instead of checkpointed.


The system will not acknowledge that message 1 has been processed until it has been fully processed


Onyx guarantees at-least-once processing, segments may be sent down the workflow multiple times


And since Kafka's progress is only an offset, that means that message 1 and message 2 would both be replayed when the system comes back up.


Okay, that helps. I didn't understand that part.


Yes, this is not the case however when using stateful aggregations


Onyx’s windowing features


Those provide exactly-once guarantees


Exactly once in the sense that messages will be replayed, but that Onyx takes care of idempotency, right?


I think I'm also confused about how the Kafka offset storage works in a shutdown. I thought offset was stored by clients rather than in Kafka server. Do I need to take care of explicitly recording those somewhere?


No that’s why Kafka relies on Zookeeper.


It records offsets in ZK


I can check when I get home later but I believe the Onyx plugin does something similiar


You wont have to manage offsets at all, you just have to decide at the start of your job if you want to start from the last known offset, a specific offset, or from the beginning


And that setting (last known offset, a specific offset, or from the beginning) goes in the Onyx catalog entry for the Kafka plugin?


One moment, on cellphone wifi (Comcast has been giving me the run-around)


It looks like you cant specify a specific offset, but :kafka/offset-reset takes either the :smallest :largest or false (for the last recorded offset).


Those are specified through the catalog map yes


So I would guess that smallest is the conservative option that I would probably use.


Having it false would most likely be the most conservative option


or nil ofcourse


Because it would resume from where the last failure was


or rather, from at-least-before where the last failure was


do you know what i’m saying?


at-least-before-last-failure sounds perfect. I'm just finding the wording a bit awkward.


yea haha sorry my fault


Main thing though is that you've pointed to the right place in the docs. You've taught me enough that I can go back to the docs with fresh eyes. Thank you. simple_smile


@ericlavigne: If I can summarize quickly (I'm also on the move, wooo Friday night!), you should never experience an unprocessed message using the Kafka plugin. The plugin takes care of checkpointing the offset into ZooKeeper for you, and you can parameterize what to do if there's a failure and replays need to happen. Right now Onyx is at least once, as @gardnervickers said. You can turn on "exactly once" via windowing, which Onyx again takes care of idempotent stateful updates for a certain set of aggregation operators.


In the LJ talk, I was discussing the current implementation of the stream engine that we use - record-at-a-time. We're moving to a spin-off of Asynchronous Barrier Snapshotting, which it itself based on Chandy/Lamport's distributed snapshotting algorithm. If you're curious, the papers are a good read.


Last thing is that order is almost never guaranteed in Onyx, or any of its cousin systems. You couldnt get good performance if you cared about processing in order all the way through.


Thanks, Michael. Sounds great. I'm more than willing to give up ordering for speed. I was just confused about how failure recovery worked.