Fork me on GitHub

@yonatanel Maybe it wasn’t given in the context of the discussion but I was speaking about the the aggregation functions themselves in that they shouldn’t rely on external state that could possibly change. :aggregation/init is just the initializer for your aggregation, and it’s handed your window map. It’s a general purpose clojure function so you can do any sort of data fetch you’d like.


That said, there’s no support for individual group initializations, but there’s nothing preventing you from initializing all your groups at once through :aggregation/init


So even though I couldn't pass a live db connection to the aggregation, I could pass a connection string, and connect from within any aggregation function. Sounds like it revokes the already non-existent warranty ;)


I don't think michael likes that idea


Exactly, it’s a bad idea


Onyx is not designed to be your datastore


I don't see it as one. It needs only enough state to handle incoming commands.


Seems like it isn't supposed to have state that isn't fed through input tasks


Even akka initialize actors with messages. Perhaps it should be the input plugin's responsibility.


@yonatanel If you can write up an issue on GitHub with a detailed description of what the problem is in particular, I can give it more thought. I’m wary of initializing windows with arbitrary state - state is meant to be accreted over immutable event streams, and grabbing values off from the side is a gateway to losing that benefit.


Onyx’s design is in opposition to Actors for what it’s worth. Trying to make one design fit the other would be troublesome.


@michaeldrogalis Exactly. I'm trying to understand the onyx way to do what I need. I was actually saying that even akka persistent actors don't just grab state from anywhere but use events, so there's no reason to expect it in something like onyx.


@michaeldrogalis There was a discussion with lucas some time ago where he told me to open an issue and this is it:


@yonatanel Even if you could grab arbitrary state to supply an aggregate’s initial value with, what “state” is “the right state”? The hash routes between peers are indeterminate, and each’s peers state is a partial view of the global state


I thought I should use a constant peer number and "recover" flux policy in that case.


So suppose you have a windowed task with 3 peers on it, and segments are being hash routed across those 3 peers. Then those peers sync their state to a global storage. You bring your peers back online later. Who gets what state? Do they all get a global copy of the state?


That's a fragile approach, because once you start a job, you can never scale your resources up or down


How does even a simple word count works under dynamic scaling?


If an aggregation is a bank account and I don't have the entire event trail on the same peer, how can I know if a withdrawal is valid?


You can’t add peers to a running windowed task because it wouldn’t be able to consistently a key across those new resources, but those peers are presumably writing Word->Count pairs to durable storage. When a need job comes up, with more resources on the windowed tasks, those peers are summing the permanently written values with their new windows.


I think you’re squarely in application design territory, Onyx can’t do much for you here.


Regarding word count, how do the job's peers get the permanently written values?


I highly recommend reading the Dataflow paper which all of this is based off:


OK I will. Just to make sure, you're saying trigger/sync can help me fetch state from durable storage upon a new job?


I think I see the confusion here. You’re trying to maintain an aggregate and accrete you’re data inside of Onyx, periodically snapshotting it to durable storage. This is what I was getting at saying that Onyx is not meant to be your datastore. Treat Onyx aggregations as transient state that can be sync’d with an external datastore to update it.


You have access to the Event map inside :trigger/sync, you can do anything.


@michaeldrogalis I can't influence the aggregation state from sync function. I can only get data out. I can of course run a query and put the result onto kafka where it will be picked up by the aggregation later...


We’re going in circles here. The Dataflow model is explicitly designed to not let you pull external state into a window.


or pick up the data in your task prior to the window call


There's an idea


@gardnervickers It's the other way around. I'm saving individual events in durable storage and keep only the aggregation in onyx. I also snapshot from time to time but that's another story.


Yup I got that. I’m saying you’d be better served not storing your entire aggregate in Onyx and instead using Onyx’s aggregation features to build up temporary aggregates that you can then merge with other DB’s to create your “materialized view” of the event stream, keeping with CQRS/ES parlance


It is a confusing issue and I should make a diagram of the exact use case because I didn't explain myself properly and it results in conflicting advice :)


@gardnervickers Interesting. I will think about it.


@gardnervickers Though I don't understand how I would split a transaction boundary. To handle an incoming command I need the whole transaction boundary, the domain aggregate, which is for example the current state of a bank account. I can't do that with partial sums of the same bank account scattered, especially since not all commands can succeed, such as withdraw money with not enough available funds.


That’s particular to how you design your application though right?


@gardnervickers Yes, though it seems like classic CQRS from all that I've read.


@yonatanel would this become easier if triggers could send aggregates on to other tasks?


I think that could be the fundamental issue here


Correct me if I’m wrong but I think that @yonatanel aims to structure his application as a giant fold over previous event history, but in the case things go down he wants to be able to pick back up from a snapshot instead of paying the price of a cold-start, re-processing all of history. I think what he wants to do is provide one of his aggregation backups as a starting point in the fold/reduction.


@gardnervickers My input is commands and the output is events. When the job "recovers" it can either read all of history of events or use a snapshot. It's an implementation detail. After that start, it keeps processing commands and output events.


Exactly, so you’re asking for support of that particular implementation detail?


@gardnervickers Maybe the confusion is in commands vs events. Once I process a command and it becomes an event, that command can be deleted forever while the event stays somewhere. When the job recovers, I don't want to reprocess all commands, but to start from the last aggregated state, which is all the events applied. Does that still sound as before?


Yea I’m talking about the stuff you’re storing in your log


Command/events, your basis of truth, entire history, etc.


Applying all events from history seems to me just as easy as applying a snapshot first and then all the younger events. I think it's solved already.


I'll read that paper to be on the same page.


hm I see in abs-engine that Messenger protocol has start stop methods but I don't see implementations.