Fork me on GitHub

Does onyx-local-rt support custom aggregations? I'm getting IllegalArgumentException Wrong number of args passed to keyword: :fully-qualified.aggregation/create-state-update-fn clojure.lang.Keyword.throwArity (


What does your aggregation look like?


(defn init-aggregate [window]
  {:event-store {}})

(defn aggregate-fn [window state segment]
  (let [{command-id :command/id command-type :command/type :as command} segment]
    (-> command
        (dissoc :command/type)
        (assoc :event/type command-type
               :event/id (uuid/v5 uuid/+namespace-oid+ command-id)))))

(defn apply-event [window state event]
  (update state :event-store assoc (:event/id event) event))

(def aggregate
  {:aggregation/init ::init-aggregate
   :aggregation/create-state-update ::aggregate-fn
   :aggregation/apply-state-update ::apply-event})


The problem is that :aggregation/init’s value should be a function


i.e. without the :: in front of each of the values


Silly me. Thanks!


Could it be that :aggregation/init is not invoked in onyx-local-rt? I put a println inside and I see nothing and my state starts as nil instead of the return value.

(defn init-aggregate [window]
  (println "init-aggregate" window)

(defn validate [window state segment]
  (let [{command-id :command/id command-type :command/type :as command} segment]
    (-> command
        (dissoc :command/type)
        (assoc :event/type command-type
               :event/id (uuid/v5 uuid/+namespace-oid+ command-id)))))

(defn apply-event [window state event]
  (assoc state (:event/id event) event))

(def aggregate
  {:aggregation/init init-aggregate
   :aggregation/create-state-update validate
   :aggregation/apply-state-update apply-event})


Yep, looks incorrect to me


It should be calling the init fn here as the second arg for the or, rather than using window/init


:window/init is valid as well. Are there any precedence rules here?


My understanding of window/init was that it is for use by the aggregation fns themselves. So your aggregation init fn might lookup the window/init value when initialising. To be honest it's a little confusing and I don't see any aggregations that use it. We may have to wait for @michaeldrogalis to come online to clarify it


Hello, I have made a small adaptation to the onyx.plugin.input-test in order to test the effect of varying numbers of partitions. Up to 6 partitions works but for 8 the job hangs at: Enough peers are active, starting the task


If you want I can provide the adapted code


But I wonder why it does not work at this number


@eelke I assume you mean kafka?


yes, apologies


I’ll have a look at it


@lucasbradstreet There's something interesting in onyx.windowing.window-compile/resolve-window-init. Can't say I understand it fully.


Ok thank you!


You need the adapted code?


I’ll give it a go. There’s only a few variables to fix. I should really pull the number of partitions out into a let binding to make it easier to test though


Ok cool, and I agree


Are you seeing this at all? “16-11-16 10:20:17 lbpro WARN [onyx.messaging.aeron.publication-manager:47] - Writing nil publication manager, likely due to timeout on creation."


If you are asking me, no I don't, but I have seen it before.


OK, definitely having problems with 8 partitions, but it might be a different issue


Ah interesting.


For the other issue I believe I adapted some settings with help of


But I am very curious what other issue this might be


@eelke this is definitely a bit odd. At first I thought it might be to do with the way take-segments! is used, since this test doesn’t actually make the job complete via a :done message


I have to go to dinner, but I’ll have a look at it when I get back


Yeah I noticed it to that there is no :done message. However, if I do send a :done message I get clojure.lang.ExceptionInfo: :done is not supported for auto assigned kafka partitions. (:kafka/partition must be supplied)


But bon appetit


Right, yes, you currently (possibly never) can’t use the :done when reading over multiple partitions


How can I know the group-id an aggregation is initialized in? I need to init the aggregate from DB according to group-id. :aggregation/init is passed only the static window definition map.


Hmm. I don’t think you currently can


Can you give me an example of how you would want to initialise the key, depending on a particular group-id?


my group-id is the cqrs transaction-boundary, or the specific stream of events that together make up the current aggregate state.


How are you initialising the aggregate from the DB?


Just trying to nail down the use case


It's just like the aggregate in onyx applies state changes from bookkeeper. I read all the events related to the group-id from the beginning and apply them one by one with my own logic, and only then I can process further commands (segments) coming in.


I'm simulating a bank account with event sourcing. Segments have a :command/type (`:deposit` or :withdraw). The aggregate checks if account has enough money to withdraw and updates the state.


That makes sense


So each bank account is a group and has its own window and aggregate.


I don’t have a good solution for you, so it seems like something we should add


Meanwhile I'm prying around the code looking for a workaround or if it's straightforward to PR.


There doesn't appear to be a trivial fix, without changing the aggregation function arities or assoc'ing into the window map


can a lifecycle function implement the aggregate better, or communicate with the aggregate/window?


The tough part is that a new group key can come in at any time


Maybe I can do it on the first segment the aggregate processes


That is definitely a possibility, and was one of my first thoughts


Yup, having another look at it in now 🙂


I was also wondering if maybe the same issue occurs in the version.


Does this happen during a transactor failover? I tried to kill a transactor and bring it up locally, and everything worked fine, so I'm a bit baffled. I know it goes down every time we have a transactor failover, will have to setup staging to simulate it, and see what the problem is.


I have also tried the same test but with using :kafka/partition


with the same result


@greywolve my best theory is still that the deref is never being notified that the write has failed. Not sure what datomic does there, but I think the deref is blocking forever


There's a default timeout though, but perhaps it isn't respected in certain cases


Oh yeah, I did look that up at some point


Wish I knew where it was getting stuck


@lucasbradstreet @greywolve It should be possible to figure that out with a thread dump


I haven’t been able to reproduce it


True. @greywolve if you could get a threaddump that would be very useful


I'll see what I can do 🙂


What's the difference between the "onyx" and "onyxplatform" repositories on Docker hub? It looks like "onyx" only has the onyx-dashboard image while the other has images for other onyx things, but they're all a few months old.

lucasbradstreet14:11:52 is definitely official. It’d be nice to have everything on onyxplatform though. onyx-dashboard is the only thing that I can think of that we really distribute via docker


We should consolidate those


Ah right, all the images that we used with BookKeeper


And the Aeron sidecar


Yeah. I think we should move onyx-dashboard over to onyxplatform for consistency


@eelke I think you have discovered a weird bug that may not be kafka related. I am not far enough along to say for sure, but there appears to be something else going on


Ah ok, interesting. I wonder what the bug may be. I think I have noticed the same problem when running in production, also I am not far enough along to say that. What do you think might be the way to proceed?


I’m tracking it down


You are sure we are experiencing the same issue right?


input-test works with 7 partitions, completely blocks with 8


I think it’s the same thing you reported


sounds like it


maybe worthwhile to see if the same thing happens with 0.8?


At this point it appears to be completely unrelated to kafka


@eelke What we’re witnessing is use of alts!! breaking down. It’s very very weird


both take-segments! and internal alts!! are getting blocked for no good reason. I have rewritten them both to use poll! instead, and everything is working nicely with 8 partitions


@lucasbradstreet I now see that only :trigger/sync function has access to potential db connection, meaning that an aggregate can't rely on any external state, so I can't initialize a state from db.


I don’t know why this is happening. there could be a concurrency bug in alts!!


@yonatanel yes, that’s true. We didn’t really anticipate any work in the window requiring any connections, etc. We are possibly wrong about that


wow ok, where in the code does this happen?


@lucasbradstreet This is quite normal for event-sourced aggregate. Perhaps aggregations is not the way but it looks so similar and matching my use case.


@yonatanel can you create an issue with the rationale/example? @michaeldrogalis and I will discuss it and will implement it if we think it’s a good idea (it probably is)


@eelke we use alts in aeron/receive-messages and in the take-segments! (along with several other places)


@eelke can you add this property to your project and try the test again? "-Dclojure.core.async.pool-size=16"


Unbeknownst to me, alts!! appears to be doing some weird thread sharing stuff to coordinate, which means you can get stuck. Maybe this is reasonable. I’m not really core async’s biggest fan these days


Is there any documentation on the various images in the onyxplatform Docker repository? I haven't been able to find anything


@stephenmhopper There is some docs, but the images can for the most part be replaced by existing official images.


Github is giving me the pink unicorn but I’ll provide a link when it comes back up.


okay, thank you


Pink unicorn for me too


I am actively working on updating the BookKeeper, Zookeeper and Aeron sidecar images today though.


Yeah, I was getting the unicorn too a minute ago


Thank you


But I am surprised nobody else has run into this problem


For me this addition to the project works, but will you follow up on this bug?


@eelke it’s not so much a bug as a configuration issue, but we should possibly re-write parts to not use alts!!, seeing as it’s susceptible to these sorts of issues


@eelke: I'll continue to investigate it further.


Great, thanks for the help, and keep me posted


@stephenmhopper This is the repo for onyxplatform/zookeeper, it was created when Kubernetes stateful services (PetSets) were alpha, but I believe the API has remained relatively stable since then. The same goes for Bookkeeper here


I would say the most interesting part of those repo’s is no longer the containers, but the Kubernetes manifests.


However, I must say that as a newcomer to Onyx, I'm feeling very overwhelmed


Distributed systems, by their nature have a lot of moving parts, especially when you bring deployment into the mix. If you have any questions, feel free to ask.


I definitely recommend a measured approach to all of this. My workflow usually goes 1. Play at the REPL with regular clojure functions and test data. 2. Write a test (like in the template) and use the clojure functions in my Onyx job. 3. Iterate until I’m happy. 4. Write a transform (like in the template) to swap out the test input/output for production input/output.


yep, that makes sense to me


I think I'll just start my barrage of questions and see where that gets me


@gardnervickers @stephenmhopper 1.5. Use onyx-local-rt instead of the full platform!


First off, if there is an onyx-platform/docker-zookeeper image, is there a reason for the onyx-template to use wurstmeister/zookeeper in the docker-compose.yaml instead?


Yes, someone is actively managing that image, we’re not actively managing onyxplatform/docker-zookeeper.


onyxplatform/docker-zookeeper Has some special features to facilitate running an ensemble.


Now I'm more confused


We don’t provide a Zookeeper image for docker-compose because there are ones already out there.


Under what circumstances would I use the docker-zookeeper image instead of the actively maintained one?


If you needed to run Zookeeper as an ensemble in Kubernetes


But even then, in the 4 months since that image was last updated, there are better ones out there.


So I should just use one of those instead if I'm running on Kubernetes?


okay, I'll look into that


Are you planning on deploying to Kubernetes?


I haven't used it before, but I was going to evaluate it. I've heard good things about Mesos before and based on what I've read in the user guide, it sounds like people typically use Mesos / Marathon and Kubernetes is less frequently used in the Onyx community, no?


Onyx takes no stance either way. I believe more members of the community are running on Mesos though.


I am heavily biased towards Kubernetes though, any workflows around deployment I build will be targeted at that platform.


I've spent a lot of time working with Hadoop, Spark, and YARN. I'm really looking forward to getting away from the plethora of classloader issues inherent in those tools. If I'm understanding things properly, Mesos or Kubernetes will provide similar facilities to YARN in terms of scheduling and resource negotiation, no?


Kubernetes will handle resource scheduling for you, yes. That scheduler is pluggable as well.


Also, in the Hadoop world, all of your apps on a single cluster must require the same version of Hadoop and its dependencies (typically as "provided" dependencies which are present on all nodes). With Onyx, however, it seems like I just bundle Onyx with my app and deploy that uberjar out to Mesos / Kubernetes thereby allowing me to have multiple apps running different versions of Onyx, but on the same cluster. Is that accurate?


Yes, but I think as of a few updates ago, all Onyx peers participating in the same job must be running the same Onyx version, which makes sense.


You can have two different versions of Onyx running two different jobs on the same K8s cluster however


Is K8 the standard abbreviation for Kubernetes like i18n is for internationalization?


Yup, should have clarified that


cool, I like it


Is there any sane way with Onyx to support a dynamic workflow without throwing tons of hardware at building a huge cluster? Namely, I have a situation where lots of users are going to end up having unique tasks in their workflow and thus I cannot have 1 workflow for all users. The workflows are similar to what you might see when running command lines with unix pipes - there are a bunch of different steps chained together with different input properties per pipeline. In other words, the possible number of tasks is high, but fixed, but there are tons of combinations that are unique to each user. A lame way I can think of doing it is to setup a generic workflow that looks up each function it needs to run, then submits again to a queue/log (ex: Kafka), starting the process until it reaches the end (database/core.async channel). There would be considerable overhead here and it would be harder to interrupt, require more intermediate state, and add considerable lag due to all the round trips, so I don't like it. Beyond that, I'm not sure what to do except to push the execution on to the client itself (browser or other servers), which kills all benefits of Onyx and throws out any processing guarantees in the name of supporting dynamic behavior. Another approach would be some kind of fixed execution pool for tasks and just doing everything in pipelines of go-blocks per segment and use Kafka for ordering, but again this throws out Onyx and all the guarantees it provides.


Just to be clear, as far as the task pipeline, I mean something like: User A wants to take a chat message, filter it for profanity, make it all lower case, and then write it to a DB. User B also wants to take the same chat message, but he wants to make it all caps, linkify any hyperlinks, and output it to an s3 bucket. There many be some overlap or none at all between what each user is doing, other than perhaps be processing the same stream or streams originating from the same database but composed of different data.


I think you’re use case is too specific for me to provide any kind of general advice. I would not shy away from running a large number of jobs however.


But don't jobs require their unique set of resources (CPU cores, threads, etc.)?


They require a unique set of peers, which is just a couple threads on the JVM


If the threads will be parked 99% of the time, that’s not a problem.


Moreover, the duration and volume of these jobs varies a lot. Some may run pretty much forever constantly processing, while others may only run a few minutes, while still others may run forever (effectively) but not see data for days.


Are you both talking about the same thing?


Talking about the same thing?


Ahh shoot, misread the username in the previous post


Nevermind that 😄


Anyway, it makes sense to me that it would work with enough resources if the jobs were more short-lived, but effectively many of them are not.


So how are you going to tell Onyx how to dynamically set the task function?


something like {:fn :my-function :data [{:foo :bar}]}?


If you imagined we were processing unique jobs per slack channel here, some of these channels are very busy, while others are not. If each had a unique workflow, during busy periods it seems to me the system would eventually exhaust its resources pretty fast without throwing a ton of hardware unless Onyx can park each job for a bit to allow everyone to work?


Well I can tell onyx which task function in a simple way by using the standard approach - I have a JAR(s) with all the provided functions, so the choices are fixed. The only thing that might vary is some config per task like the character to insert in a string for some kind of string replace task.


If I wanted something more dynamic as I described in the workaround, it would have to be something closer to what I'd do if I was using event sourcing - provide a command, lookup a handler somehow, execute it, and emit something (or not at all).


But how are you planning on making a task dynamically (in the middle of an Onyx job) switch to a new function


In which case I'd essentially be replicating what onyx already does with function lookups in JARs


I actually may need to switch in the middle of a job, but I was going to concede not doing that in the middle of a running job. What is dynamic is just the actual workflow composition. If you imagine there are 1000 tasks to choose from and a workflow can have any number of tasks, there are quite a lot of permutations so I can't just keep a job running and hoping the topology of the workflow matches one in the pool or something.


As far as numbers of jobs, it could be several hundred initially running at a time, and many more longer term. I'm thinking there's just no way I can really use Onyx to do this. That said, I'm using it for other things, but I was hoping to not having to resort to a more crude approach.


I mean if someone wants to give me a few thousand machines, no problem 🙂


I guess I don’t understand what you’re trying to do.


The best analogy I can give is the unix command line


If you can get away with not using Onyx in favor of something with less moving parts, then that is definitely a better route.


You have a fixed set of commands, but you can pipeline them in any number of combinations, with many different settings for the params/flags


The difference is the execution is long-running, not usually one and done like your typical unix command line usage


Parameterized workflows are the perfect use case for Onyx jobs.


moreover, the input data is much larger than what an ordinary unix pipeline would be doing since the input sources are streams


Running a high number of jobs is totally fine, and resources can be allocated by the scheduler for jobs that require more/less.


Other than pushing processing out to the user's machine, I think though I would quickly run out of hardware resources in most approaches here, Onyx or not.


@hugesandwich This is really depends on what the available functions need to do. If they are simple enough and each user workflow is completely declarative and represented as data, you might use an Onyx workflow with a single significant task that in itself runs the pipeline.


@hugesandwich I think we’re just speculating too early here.


@yonatanel This is what I was hinting at when I mentioned ES.


@gardnervickers I don't think it's speculation at all. Take my chat example, if you had thousands of chat channels and a unique onyx job processing each one, if a lot of chats were active, the system would quickly run out of resources.


Yup no doubt there.


that is under the caveat that each chat channel has a completely different workflow, in terms of the task functions, task ordering, and number of tasks


@hugesandwich I'm actually tackling ES right now. It's not that simple in Onyx. Read a bit back in the chat to see my use case


All I can think of is a single job that will deal with it and event sourcing seemed like a potential solution, but I feel it's going to make things very messy.


@hugesandwich why do you think a single job would handle this better than multiple jobs?


If I put Kafka topics in front of it, I can at least manage the load that way or even shard the load


I think a single job would be better only from the point of view I would know it is always running and processing, and not completely blocked from running. I've taken a look at the schedulers and that can help somewhat, but eventually there will simply be too many jobs.


What’s leading you to that conclusion?


So it doesn't have to be a single job, but just a predictable number of jobs that I know it's always running and just dependent on getting in line via Kafka or something else.


Onyx is capable of handling many jobs, and scheduling different amounts of resources for each job.


Yes but all the jobs can't run at the same time


They could of course if I had enough hardware


I think in @hugesandwich 's case, a job is part of the application level, so you can't just let Onyx handle everything under the hood with jobs.


Tasks in a job where there is no data to process consume minute amounts of resources on the peergroup they’re scheduled on. It is possible to run many jobs simultaneously across the same cluster/tenancy without starving your cluster of resources. (This is under the assumption that you actually have enough compute to handle your dataset).


I may be able to at least make it still run, albeit not at the speed I eventually want by doing something with the job scheduler and some Kafka partitions. But Kafka has its own issues in that I can't spin up a topic per job or even a partition for job because you'll eventually run into limits there too without separate clusters.


For Aggregations Onyx has its own event sourcing with BookKeeper, but if you want to persist events to your own event store and have rehydration of aggregates when new logic is deployed you have to implement your own.


@yonatanel Trying to catch up on the conversation. There could definitely be a bug in the local-rt with windowing — that was the bit that required the most custom reimplementation.


EventStore has unlimited topics exactly for event sourcing:


@yonatanel I previously experimented some with storing events in Cassandra and bucketing them in rows as appropriate. It worked pretty well, but ES doesn't necessarily cover all cases. Worked pretty well for standard CRUD stuff and when I didn't care too much about real-time.


@michaeldrogalis Waiting for your input. I have a lot to think about before opening an issue for CQRS aggregates.


Am I the only one with strange slack bug when referring to someone?


@yonatanel I think ES with Onyx worked well for me when I was just doing your standard CreateUser and then emitting UserCreated. But in the case I have now where I have chains of tasks, it becomes a lot more complicated and starts to involve things like sagas which I feel like just overcomplicates things more than they already are for my specific use-case.


@yonatanel Heh, yeah Im getting that too. Thought I mistyped at first.


Did you have one particular question that needed answering? Still sifting through chat.


@hugesandwich We're probably on the same page. I'm having exactly your considerations.


@michaeldrogalis Mainly regarding initializing aggregates from DB and persisting state-update to my own external storage and not just to BookKeeper which onyx uses internally.


Right, window contents are meant to be periodically deposited into external, stable storage and purged from BookKeeper.


So what do you mean by "initializing aggregates from DB”?


@yonatanel My original "stack" about a year ago when I did a POC was Onyx + Kafka + Cassandra + Datomic. Datomic was the read-only DB essentially, Cassandra stored the events and used wide-rows, bucketing, and transactions for a few things. It was pretty fast. The big drawback I had was I felt like I was throwing out a lot of nice things about onyx when actually running commands in that way since a bunch of steps in my flow were bunched up into a single task (ex: command handler).


Did you have any in-memory state?


In-memory state where?


Michael, Sandwich, sorry, I will explain later. AFK a bit.


On a side note, just to confirm, I imagine there is no capability (for good reasons) to modify a running onyx job's workflow - i.e. the workflow itself is immutable? Or for that matter, at least pause it, modify it, then resume it again with the new modified workflow. But maybe I missed something.


Yes. It was intentionally designed to disable any modifications to the structure (workflow, catalog, etc) of a running job. It is effectively immutable after submission.


If you want to modify a job, kill the original job, redeploy any new code that’s needed, then submit another job.


OK I would imagine so. Modifying a running workflow opens all kinds of cans of worms.


The project I am starting to work on is pretty complex and I am considering which parts I can implement in Onyx. At a high-level, the user can define a flow of tasks over time and was thinking whether or not to allow modification while these tasks run. If you've ever used a sequencer for music it's similar to that. But instead of lets say midi commands or musical notes at a particular time, a task is defined to be allowed to run.


So most things just end up running forever, but some lets say might run for an hour only. I think I can translate it to onyx by using flow conditions or just checking inside tasks themselves and submitting all possible tasks that can ever run as the workflow.


@hugesandwich So you’re dealing with a potentially high number of task combinations, potentially high volume, and wide variations in execution time?


Think a music player that plays tasks


the "instruments" are things that bring in streams - ex: just query a database and fetch a stream, pass on something from a websocket, whatever


So each item in the stream is fed into the flow of tasks that is mapped out by the "tracks" and played through the channels in a "mix"


If you have low latency requirements, you’re going to have to make a trade-off somewhere to give yourself something to work with design wise if you dont have the hardware to support that


Or pick a different tool that gives you fast ad-hoc task assembly.


The user can mute channels, play/pause execution, change the rate of flow of data, etc.


My dumb POC I did awhile back works with just core.async channels and a player loop that can transition into various states


Think something like, but obviously not music and far simpler 🙂 If you imagine the colors and bars are tasks/groups of tasks


and the whole thing represents a job


Then you're controlling it with some sort of player/mixer setup, again simpler, less complicated, more data-focused. I'm simplifying a lot and stretching this analogy thin at this point, but yeah there's a visual interface for both designing workflows and interacting with them while they run to control aspects of execution and flow.


I'd thought maybe I'll just limit it to small data or unreliable delivery, at least for a long while if I can't hack some sort of compromise with something like onyx to help with parts of the streaming and stream processing.


@hugesandwich have you considered akka (Akka Persistence) or any actors implementation?


@yonatanel Yes, I've actually been investigating whether I should dump core.async in favor of actors. I despise Scala, but maybe I'll have to just work with it. The other related option is doing at least part in Erlang.


To be more accurate I guess, not so much dumping core.async but looking at taking POC to reality that way rather than some of the other alternatives


There are actors in clojure but without persistence. It might suit you:


Ah I remember this project awhile back, but I never really used it in a real project


My core.async implementation has this whole inbox/outbox go-loop sandwich thing on a lot of what I am doing. So that is what made me think of actors.


The trick is to process a new command only after persisting the current event to durable storage and applying it to yourself.


<@U0DHHGAEP> Did you actually need to run at high scale? Or was Onyx’s programming model the thing you wanted?


@michaeldrogalis Yes, I was pleased to see that as I had long been thinking of doing exactly that myself. I've considered it if I push processing on to the client.


I also considered multiple processing models. Like paying customers would pay for the server resources and be able to do more at scale.


Then non-paying customers/users who don't need reliable processing at scale can just muck around with something simpler, perhaps that can be somewhat portable when they are ready to run at bigger scale/pay 🙂


That way at least someone pays for the machines required


Another perspective of what I am trying to do.... Think of it as broadcasting data streams. The broadcaster can muck around with how the stream is processed, at what rate, etc. and the users just consume the results in various ways and view it. If I want to sound like a hoplessly bs marketing guy, "Twitch for data streaming."


Honestly I have a lot of different ideas and I expect to not do most of them, but I've at least messed around enough to find many cool things to do. Now I just need to do them for real and first figure out what is feasible before I waste months on the wrong path.


First I would separate the broadcasting problem from the live processing


Ideally, I want something to write out the stream somewhat in advance and broadcast out the results like you would in a chat, twitch stream, whatever.


Yes, the broadcasting would just be a straight linear read sent to users. If lets say you were doing the equivalent of building a chatbot, it would be stupid at scale to do all the processing on receipt of the message in a websocket receive or send server side


Instead, I'll probably just have a Kafka consumer reading out from a topic partition and sending that over a socket, or for history, just querying the backing store that feeds Kafka.


So for some of those things, Onyx works pretty well if I need it.


If it makes sense, any broadcaster with a "mix" doing work on its input data would end up needing an Onyx job because each mix is completely different. The mix playback may also not be interactive always and instead be something automated running forever.


@hugesandwich That sounds like a fun project


It would be even more complicated if I allow the broadcaster to mess with the tasks while the mix is "playing" and yet more when controlling the data flow. It's easy to control perhaps the "master" flow out, i.e. rate limit the output that eventually gets sent to the viewers, but doing it during the processing is significantly more complex.


Frankly, without knowing what exactly you're trying to do it's hard to comment, but anyway I have a feeling you don't need a new Onyx job for each mix.


@stephenmhopper Thanks. Still a lot to sort out and most likely cut or redesign. The inspiration is I wanted something to control the flow of data over time and I haven't found much good out there. Secondly, I wanted something to be able to muck with it interactively. Essentially modern music production software like Logic Pro X, Cubase, ProTools are exactly that, but they operate on musical data instead of something like a map.


Ah yes, the ol' "just use a map" idiom wins again!


Also was somewhat inspired by old trackers as in S3M, MOD, XM


I originally had this crazy concept with things that amounted to a whole OS or Emacs but for working with data, but far too big in scope and even more nerdy than this


Again though I'm really stretching these analogies and they probably don't make sense without a long conversation that no sane person would want to have with me 🙂


@yonatanel I'm trying to precisely eliminate the job per mix thing. I had thought of doing it somehow with ES but couldn't really wrap my head around doing it all over time, with chains of tasks, and so on.


@hugesandwich Changing the mixer while it's playing might be as simple as mixing itself if you consider the mixer as an "actor" with inputs that are both data and commands to change itself, all on the same inbox.


I suspect for this particular piece I may just not use Onyx, but I am still left with the question of what else. If there's a proper/good place to talk about this sort of thing that someone can point me to, I'll refrain from further polluting things here with my nonsense.


the gitter channel of fun-cqrs perhaps?


@yonatanel I will have to think about that more, my brain is fried. But maybe it will also help if I break down the primitives a bit. that the user interacts with.


First there are channels. Not to be confused with core.async channels, I mean something more like audio channels. Their job is to pass data through them, potentially altering the rate of flow, exhibiting back pressure, applying flow-related "effects," and so on. Channels are chained together with each other to pass on data much like you would through an onyx workflow.


Next there are tracks. A track is assigned a channel. A track is more of a virtual construct that just groups tasks together and to serve as a container for laying them out over time/frames. Tasks inside a track are handed data from channels. All tasks in the same track receive data in parallel by default in my current implementation.


There's as scheduler which handles the exchange between data in channels and passing them to the tasks in the correct way and to valid tasks


Tasks themselves are really just go blocks with an in and outbox. The output from tasks is collected and fed back into any channels and routed accordingly, same as an audio mixer does.


Eventually, everything ends up at some leaf channel which amounts to an output like a core.async channel, a database, etc. just like out tasks in onyx


Overall, there's a player loop processing all of this, ticking at some controllable rate and sending in the processing time as you would in a game loop. If there's any sort of lag or whatever, the scheduler can correct accordingly for taking outputs or providing inputs.


And there's some behaviors around channels for wait for tall tasks in a track, wait for only one (drop all the others), wait with a timeout, etc. to respond to back pressure according to what the user wants


There's some other capabilities like moving processing of a track to a particular machine, doing a "comp" to merge tasks and treat them as a transducer, etc. but probably unimportant, however I am mentioning them to add that there's a reason tracks exist besides just visual organization


Finally I suppose it's worth mentioning that all these relationships are not "hard" in the sense of nested objects, but rather just stored mappings in a central atom that can be changed at runtime and responded to on each loop tick. In that way, we can redirect output on demand of even a running task without worrying about what channels and tracks and so on are doing. Likewise, we can reassign channel flows and so on too.


haha. Also I'm explaining badly, but you made it this far.


It works, but I'm at this point of wanting to kind of junk things and move forward with something real that is achievable. Nothing I'm coming up with doesn't already exist in the audio world. It's just translating processing an audio signal to processing a data signal instead. For most end-users, they're just consuming data, viewing it, chatting while interacting with the results, whatever. That's completely separate.


@michaeldrogalis Regarding initializing an aggregate from DB, I was referring to rehydration of the aggregate from some durable event store instead of BookKeeper, and also persisting events to that store as they are created and before returning from :aggregation/apply-state-update.


For example, when an aggregate implements a bank account.


@yonatanel Ah, yeah the model doesn’t really work like that. Once window contents are flushed away from BK via a trigger sync, that data isn’t intended to enter the window again. If you need to access it, would recommend taking the union of the window contents + whatever is on disk after sync


I mean, you could do that, it’s just not how it was designed — it doesn’t seem to be a problem though :window/init should do the trick


I discussed that with lucas. :window/init has no access to db connections.


it only sees the window definition.


Sorry, I meant :aggregation/init which is a function. :window/init is just a static value which I can't initialize per bank account.


We should amend that to also take the Event map


Almost every time I omitted to allow the Event map somewhere in the API I end up regretting it


People love their state


Yeah. Can you open an issue and we’ll discuss making a change there?


Another related thing is, suppose rehydrating the aggregate takes time. Is it possible to not wait for the first new command to trigger the reprocessing and instead do a "warm up", or even just for the sake of replay the events for testing/staging etc.?


And even then, suppose I want to replay my whole history of all aggregates. There's the issue of ordering which could be resolved in a window, but then the window has two responsibilities, one for ordering events and one for keeping the aggregate state. Not sure what to think about that.


I would try to rethink the design to not hold the entire aggregate inside the window contents, and instead use the approach I described above with taking the union


Windows arent meant to contain the entire state all the time.


Oh, of course. I'm referring to a case where I need to reprocess events from the beginning for a new feature. Any snapshot I have is invalid in that case.


@yonatanel I'm trying to parse out what you are doing exactly, but if I understand somewhat you are doing something with ES to roll-up an aggregate's state by processing its events. And it sounds like you want to reprocess all aggregates from t=0 and roll-forward their state 1 by 1.


Yes. It's also how I want to create new materialized views when they are needed.


it depends on your storage, but there are a few approaches I've tried for that


@michaeldrogalis BTW the aggregate is relatively small in size and is a function of the history, not the entire history itself.


The naive approach is simply to query all the events in a single task that gets them all in order from the DB


If the event list is huge per entity or the query would take a long time to return or process, you can at least chunk it out, snapshot, then move on to the next chunk


If you have a lot of aggregates that is a lot of queries of course


beyond that, you can either via onyx or something else also just push everything into a kafka partition. It will be strictly ordered in the partition so if you produce them in order to kafka, you can also consume them in order that way. Your consumer though will have to deal with resuming, fault-tolerance, etc. If you wanted some parallelism, you could split the aggregates into multiple partitions.


Perhaps this is where Onyx shines, streaming the whole history to a strong cluster without interfering with production, windowing for order and building aggregates, snapshot to external store inside the :trigger/sync function, and when close enough to current time, switch to production and start from the acquired snapshots


the window though will have to have some kind of cut-off so it can trigger to then update downstream


and to prevent it from getting huge


@hugesandwich I think in Onyx once you consume from Kafka the order is not guaranteed in the next task in the workflow.


worst case you could again chunk out the window and keep snapshotting


@yonatanel Yeah, you lose the order when it makes its way into onyx potentially. What I am saying is to process from Kafka outside of Onyx some other way


At most in that case you'd use onyx to hydrate kafka with a full query for each aggregate, that's all I was saying in relation to Kafka + Onyx


I agree it's better to use Onyx, but if you're windowing you have to then be prepared you won't get a state update immediately until the window closes


or more accurately I guess, triggers


@hugesandwich You can trigger on every incoming segment if you must. But I think there's a way to trigger or at least to write a refinement strategy that only waits until the next in-order event (from the last trigger) arrives.


Well I'm assuming you want windowing because your segments are arriving out of order and you want to deal with them in order because further downstream you don't want your output to be inconsistent (if the events themselves are applied to an aggregate out of order)


So if you have some state involving sequence or something, then maybe you can trigger more often or how you describe


i.e. knowing what the next id should be or some other criteria, so if you get a bunch of data before that id, you wait until you get it or fail


I'm not really in love with those kind of approaches though


I'd even settle for simply....not Scala 🙂


No problem programming in Scala, but it is quite possibly my least favorite language among minimum 20+ I've worked in


Speaking of which, I also looked again at Quasar/Pulsar. Still hard to justify given the community/contributors vs. Akka.


@michaeldrogalis I can fix the onyx-local-rt issue that :aggregation/init is ignored, but I need to make sure with you. I read the regular onyx code and it seems it first does a one time window compilation where it takes the init value either from :aggregation/init or from :window/init. In onyx-local-rt it takes either the current state or :window/init on each iteration. I could add a check for :aggregation/init there before :window/init, but it means a nil state at any point in time will invoke initialization again. I guess nil state is not permitted but it's not mentioned anywhere.


@hugesandwich It's hard for me as well to stomach scala, but it's not that crazy to interop with and there's a java api for akka which makes things easier.


@yonatanel Ill take a look tonight. Need to think on the aggregation init parameters, too. Im not totally sold on whether that should use side affects to get its init value


@yonatanel That is what I have done in the past. Or just written straight Java. I've also done Clojure interop directly with Scala and most things are easily solved with an encoding/decoding protocol if you want to speak pure Clojure to/from your functions that touch scala


It's more an issue of library designs by authors of Scala libs for me usually. Quite a lot of Scala I see just abuses features and syntax, and just ends up being like a worse Java or fails to justify even being in Scala at all. Akka though is a nice library overall, so I'm more just referring to that world in general.


I created #cqrs for our more general woes.


OK, I'll try to leave myself in there. Do post though if you come up with a good event sourcing solution for Onyx.


@michaeldrogalis Sure. I'm not sure the design is supposed to support my use case. It just superficially close to what I want. Good night.