Fork me on GitHub

Hi! I’m looking to onyx-redis, and i found :onyx/plugin :onyx.plugin.redis/reader in but didn’t find onyx.plugin.redis/reader itself in code. Is it ok? Because looks like it doesn’t work...


@rasom: Apologies, that should not exist


I’ll take fix that thanks for catching it


So, do you mean that onyx-redis will not support any kind of input tasks from redis?


We removed the redis reader because it did not allow for safe checkpointing that other readers support


You can still use onyx-seq or the core-async plugin to read from redis and inject segments that way


We did not want folks to get confused on the safety guarantees offered by the redis plugin.


ok, i see, thanks


If you need to read segments from redis I can help out with that a little later, I’d love to include a section on that in in the redis docs for now until we can get around to making the input safe.


Stepping out right now though

rasom10:04:13 here is another problem with [org.onyxplatform/onyx-kafka "”] doesn’t exist in clojars


@rasom thank you for the help, I went through the rest of the plugins for the 0.9.x branches and they should be good. Please let me know if you see any other discrepancies, seems like there was a problem with the release scripts this time around.


ok, thanks


Apologies for the big exception dump but I’m having some trouble with Kafka input stream with a fixed windows.


I’m assuming it’s not finding Zookeeper though it’s running locally.

org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /onyx/a654f33d-261f-47f6-906d-f58f4a6f6c79/ledgers/available/readonly
    code: -101
    path: "/onyx/a654f33d-261f-47f6-906d-f58f4a6f6c79/ledgers/available/readonly”


Unfortunately this is a problem with BookKeeper, if you restart a job quickly BK will not ignore/overwrite the previous ZooKeeper node.


Is there a preferred workaround?


The only real workaround is to wait for the ephemeral node to expire


ok, thanks for the quick response, appreciate it


Unfortunately that’s not a super great workaround 😕


I usually develop against an in-memory zookeeper instance and just teardown/setup when I’m iterating


Thanks @gardnervickers - it’s all helpful. I’ll take a look. Thanks for your time.


The problem is on our radar and being worked on


@michaeldrogalis: The dataflow model supports a refinement mode called: “Accumulating & retracting”, is this something Onyx supports?


@leonfs: Not yet. We have an open ticket for that one. The implementation is complicated.


I know the DataFlow model had it in the paper since the beginning, but when did Retraction support land in Google DataFlow itself? It wasn't there for quite a bit.


@michaeldrogalis: I trully don’t know. I’ve been reading the paper and I struggle to understand the motive for retractions.. So I thought maybe it’s better explained on Onyx documentations. To later find out that it has not been implemented..


You would use retractions if you wanted to answer the question, “What’s the highest temprature read by the sensors over 10 minute windows"


Where data can arrive arbitrarily late


@leonfs: Think of it like this. Say you have an event stream ingesting vote counts from an election. You get a message that says 10 votes for Bernie Sanders, so you add that to your running total. Then later in the day, the person who sent the message realization they screwed up, so they send a new message that says Bernie gets 12 votes. You need to retract the previous assertion of "10 votes" into your account, reapply an assertion of "12 votes", and then (here's the kicker) retract and reassertion all downstream state updates that depend on the counter change.


It basically covers scenarios where upstream producers either have an "oops", "nevermind, I changed my mind", or "I learned something new about what I previously told you" case.


Cool.. thanks for the examples it is clearer now..


Do you have a conceptual idea on how you could implement it?


I haven't done any design work for it yet. I think @gardnervickers may have had a think about it though?


@leonfs: I wrote this up a while ago to check my understanding, not sure how useful it will be


@gardnervickers: great.. I will definitely have a look at your notes..


But retractions are tricky. Technically Onyx does support them currently if you specify a custom refinement, but that’s just because Onyx does not yet support serial aggregations.


retractions are not really interesting until you can have multiple serial aggregations though


This is a poor solution though because whatever you come up with would have to be application specific. If you’re making a vote counter, your retraction would be a segment that’s -1 instead of +1. If you’re making a temperature data ranking system, a retraction would be dissoc’ing the temperature value.


yeap, the paper talks about persisting the emitted value


on the next trigger before subimiting the new value, emit the persisted old value


they rewind essentially


You would draw a line in time and say “any data before T, i dont care about, its too old, any data after T gets re-incorporated into the window it’s supposed to be in”. This would put the burden on Onyx to maintain all after-T segments at the first aggregation


I believe T is the watermark in their paper


I've also never found many cases where I needed retractions, but that's primarily because for a long time I've always operated on the principle of using immutable logs. So in the case of votes, fixing the mistake itself is an event in the stream and thus the fix has its own t associated with it and events, which contain state to decrement the votes. Mistakes and changes in my experience are always a part of processing any data stream, so it's just best to treat them as pure data - i.e. roll forward the corrections, changes, whatever. A system that can't do that is at minimum at risk when you upgrade to say a new data format or conversion formula anyway. It's useful to know when that stuff happened rather than mutating anything directly as the mistakes are often useful information. No special mechanism is generally needed if it's just a matter of running something that streams the same values and applying some function that takes data (ex: deltas). The only gotcha I've found is avoiding getting yourself into situations where you cannot replay a stream in the same manner (ex: not idempotent, bad side effects, etc), but that's usually a sign of bad design.


Retraction support of the type expressed in the data flow paper is necessitated by unbounded streams that cannot guarantee event-time ordering.


Even with a log you still need to use retractions for certain types of queries


I'm not saying they aren't needed, just they aren't needed often


well unless all your streams are as you describe of course, but in practice I haven't seen it much unless there's some awful design or someone overcomplicating things


if you can make it work, it certainly is elegant, but not easy to solve


Got some coffee and thought about this some more, I think we’re talking about two different things. You need the ability to assert retractions for certain types of queries no matter your design. With CQRS/ES (immutable log processing) your still asserting your retractions, just instead of being persisted across the network it’s sitting in Kafka. If you distribute your processing you’ll hit the same complications that Onyx is going to hit.


I think we are talking about different things. I'm lost with regards to the CQRS/ES + Kafka. With CQRS/ES, I'm not sure how Kafka fits in to the issue of retractions. I do use Kafka in CQRS/ES, but as a command/event queue. Actual events are written to an event store. If say 10 fake votes were cast that shouldn't count, those 10 presumably will have already been processed. Once you emit an event in CQRS, that's it, you cannot ever reject/retract it, that should be done by the command handler or before the command is issued. Once the command is accepted, in most implementations that means an event was/will be emitted. If the event is sitting in Kafka, it needs to be guaranteed to be written. Kafka is only there to preserve order and to resist crashes before an event is written to the store. You could theoretically reject the event before it is written to the event store point by dropping it on the floor if you knew those 10 votes were fake and what events they correspond to, but that's in my experience a bad idea. Instead, you'd simply issue a new command to remove 10 fake events, which would emit events that subtract those events and probably also emit fake vote events so you know what happened. That way you see there was "fraud" potentially and take further action if necessary. The hard part and where formalized stuff comes in is exactly as implied, order, but if your data is already out of order, writing it to Kafka has no impact unless you plan on somehow windowing and emitting the results again in order.


> Instead, you'd simply issue a new command to remove 10 fake events, which would emit events that subtract those events and probably also emit fake vote events so you know what happened.


You issued a retraction 😉


writing more but I'm differtiating between a stream processor doing complex retraction at a low-level and math-like retraction


that's my point, not that "no retractions" should ever happen. You need and must have basic retractions


Where I'm saying it goes wrong is where people add in tons of overhead because of that in-between time of the retraction data vs. the current data


that is, they want to either drop the data in the pipeline on the floor or have some kind of complicated rewind for example


I've had experience with this at places like e-bay/paypal, banks, financial systems, ads, airplane data, and government services. Most of these domains are classics for retractions. The funny thing is that 99% of the time when someone said hey we need the system to do xyz complicated thing, you could do it by just using some simple math/state to set things right. I'll give the example of ads because for whatever reason ad people seem obsessed with doing things like this. Is the cost of 10 seconds of missed ad-revenue worth the ad system going 50% slower? Of course not, you make more money even if it's crap for a few seconds than you do trying to make something perfect.


We saw this all the time in financial systems as well. The simple case almost always wins out for business benefit even if on a technical level it makes people afraid something is wrong. When you add low-level complex logic and things in there to fix otherwise easily fixable things, you end up adding in other problems, usually the bad ones you don't know about


The only system I've ever worked on that needed something super crazy for sure was related to fighter jet data because accuracy was everything


another example is a colleague of mine was at e-bay/paypal and they had such a complex pipeline that all the technical magic they were working on really pointed to their pipeline sucking and needing to be simplified/broken up, again eliminating a lot of complex retraction-related lower-level logic


in that case the pipeline would take too long and get in weird states, so the solution was more make it run faster and more discrete roles, no more restarts ever


Two things. 1. Take the scenario where you’re counting votes over 10 minute spans, splitting those votes by district, and getting an average income. You realize 5 of those votes are fake. How do you fix the state of the system?


Each task would have to know how to emit it’s own retraction, thats a ton of custom logic multiplied by every onyx user


2. It is true that for domain problems, your approach is simpler, but so is running everything on a giant EC2 instance. As a framework Onyx needs to be able to handle this by keeping track of what segments generated what downstream state.