Fork me on GitHub
#onyx
<
2016-12-14
>
rajdevireddy04:12:03

Hi, looking for some info on how to do error handling in a job. I read this documentation - http://www.onyxplatform.org/docs/user-guide/0.9.15/#_exceptions and created a workflow element from one task like this [:my-task :error-task] and a catalog entry for :error-task that points to an error function that just dumps a message to stdout for now and then a flow conditions just like what is in that document. It is working, but how can I tell Onyx to do the same thing (go to :error-task) for any error in any task? Some thing like a [* :error-task] in the workflow array and a flow conditions that is like :flow/from * to the :error-task if an error occurred in any task. May be it is already documented (?) if so can someone please point me to it...I can't seem to find it. Appreciate your help...

lucasbradstreet04:12:44

@rajdevireddy you can do :flow/from :all :flow/to :error-task

lucasbradstreet04:12:03

you will need a connection from each task to error task in your workflow however. This is easy to expand though

rajdevireddy04:12:37

Thanks a lot. will do that.

yonatanel14:12:34

What's the best practice for initializing an aggregation from DB on the first segment of each group (because only then I know what to take from DB)?

yonatanel14:12:00

Perhaps in :lifecycle/after-read-batch of the windowed task, taking the time there to synchronously fetch from DB? Can I influence the first segment to include init data?

yonatanel14:12:54

Otherwise I need the aggregation to buffer segments until a special task finish fetching from DB and communicate that data to the aggregation by a special segment. If meanwhile too many segments are buffered at the aggregation I will never get the initial data.

gardnervickers14:12:29

Initialize the aggregation as in initialize the window contents?

yonatanel14:12:12

more officially, i need an equivalent behavior of returning the result of a DB fetch from :aggregation/init function.

yonatanel14:12:45

Which you can't do because you have no external context in an aggregation, other than the sync function which comes later.

stephenmhopper15:12:19

I'm writing data from my Onyx workflow out to a db using onyx-sql. I want to group my writes into batches of 100 (or fewer). I assume I should use windowing to do this, no? Most of the examples I've seen use some sort of timestampery. How do I do this without timestampping all of my segments?

stephenmhopper15:12:56

Can I just use a global window in combination with a :segment trigger?

Travis15:12:05

@stephenmhopper Could you use a batch function?

Travis15:12:20

sounds like you can

stephenmhopper15:12:12

that's exactly what I need

stephenmhopper15:12:16

maybe that's not what I need: CompilerException clojure.lang.ExceptionInfo: :onyx/batch-fn? functions must return the same number of elements as its input argment.

stephenmhopper15:12:36

I'm trying to output {:rows segments}

gardnervickers15:12:22

@yonatanel Aggregations shouldn’t rely on external state/context because they need to idempotent/pure-ish. If I remember correctly you’re trying to restore snapshots of your event log state to Onyx aggregations in order to update them. In order to fit in to the aggregation pattern they need to be setup as a reduction anyway, so what’s preventing using the :aggregation/init function to load your whole existing event-state (or group states) as a seed value for your aggregation(reduction)?

Travis15:12:27

Might want to return the same num of segments and send them to a No op based function

Travis15:12:33

if its the end of your pipeline

Travis15:12:11

We do this same thing for elastic

yonatanel15:12:56

I think i'll have one aggregation which is pure, and one aggregation/task upstream that is responsible for initialization and all segments go through it

yonatanel15:12:41

The first aggregation will trigger a DB fetch, and the pure aggregation will get a snapshot segment for a start

gardnervickers15:12:50

Unfortunately at this time Onyx does not support sequential aggregations. There’s no way to trigger to another task.

stephenmhopper15:12:31

@camechis I'm not sure that I follow. I'm trying to write to a sql database with onyx-sql. What's the easiest way to batch my writes?

Travis15:12:28

ah, gotcha

Travis15:12:55

isn’t there a batch-size setting?

gardnervickers15:12:36

It’s batched in terms of {:rows [<r1> <r2> ...]} right now. I believe that for each segment in your :onyx/batch-size, one jdbc/insert! is run for each segment’s :rows vector.

stephenmhopper15:12:23

@gardnervickers Right. I'd like to batch my segments into groups of 100 or less to build that {:rows [<r1> <r2> ...]} structure before shipping them to onyx-sql

stephenmhopper15:12:31

What's the easiest way to make that happen?

gardnervickers15:12:50

Using a window. You will need to implement that sql writing part as a trigger call however.

gardnervickers15:12:27

So you want semantics like, “Write to the DB if I’ve recieved 100 segments or it’s been 10 seconds"

stephenmhopper15:12:45

@gardnervickers yeah, that sounds about right

Travis15:12:06

yeah, sounds like you will need a window then

stephenmhopper15:12:11

Do we have any examples of Onyx jobs that use windows and do interesting things in the windowing task? So far, everything I've seen just calls identity

stephenmhopper16:12:51

Oh. Wait a second. When I set up windowing for a task. That task still just receives and emits segments as usual and only the trigger sees / receives the output of the windowing, no?

michaeldrogalis16:12:01

identity is simply a pass through, you have the opportunity to transform the segment before it’s windowed, but most of the time you don’t need or want to do that. Segment flows on to downstream tasks after that point.

stephenmhopper16:12:19

@michaeldrogalis So what's the preferred way to do batch writes with onyx-sql? Should I go the windowing route and do all of my writes from triggers? If so, can I still use onyx-sql? Also, in general, is there a way to group several segments together and pass them to a downstream task in Onyx?

michaeldrogalis16:12:07

@stephenmhopper You’re talking about writing to SQL in a task that’s not the leaf of a workflow?

michaeldrogalis16:12:25

e.g. it has children, to confirm?

stephenmhopper16:12:37

no, it's the leaf

Travis16:12:05

if it is you could use the batch function and not use onyx-sql

stephenmhopper16:12:50

@camechis That's what I'm leaning towards right now

michaeldrogalis16:12:58

@stephenmhopper If it’s an output task, use the plugin exactly has intended. https://github.com/onyx-platform/onyx-sql#upsert-rows

michaeldrogalis16:12:24

Add that catalog entry, and the lifecycles below it, and you’re set. The plugin does the batching on your behalf.

stephenmhopper16:12:19

@michaeldrogalis Right, but when I was looking through the source for the plugin, it looked like it was creating a separate jdbc transaction for each segment. The idea is that each segment could have multiple entries in {:rows [s1 s2 s3]}. Given that my workflow is creating segments s1, s2, and s3, how do I create an Onyx task that turns those three segments into {:rows [s1 s2 s3]} before shipping it to onyx-sql? Right now, I'm sending onyx-sql three segments: {:rows [s1]}, {:rows [s2]}, {:rows [s3]} which I assume is creating three JDBC transactions in a case where I'd be fine with bundling them together into a single transaction. Does that make sense?

michaeldrogalis16:12:33

Got it. Yeah, ideally you’d be able to use a window to wrap up N seconds worth of segments and send them down to the next task, but that’s a 0.10 feature that’s not available yet. It’s close, but it’s quite there. Using a batch-function as @camechis suggested will get you there.

michaeldrogalis16:12:11

The downside is that you have to handle the SQL writing yourself. @lucasbradstreet and I are discussing a change that will let you use output plugins from within the function position, which would alleviate that responsibility.

stephenmhopper16:12:29

I already have the SQL stuff written, so this won't be an issue at all. And knowing that I'll be able to pass windows around in 0.10 makes me excited

stephenmhopper16:12:38

How's 0.10 coming along?

michaeldrogalis16:12:59

Technical preview is close, build is doing well against long rounds of property testing, and is almost passing the majority of the original test suite. After TP is out, will proceed to Jepsen test and do some straightforward performance improvements. Then we’ll be at an alpha. It’s been a big effort, more or less turned into a complete rewrite of Onyx’s guts.

michaeldrogalis16:12:24

It’s worth it though, we’re set up to have iterative computation to support machine learning without much more effort.

stephenmhopper16:12:05

ABS and passing windows around are the two big things on my radar. What can you tell me about this machine learning support?

michaeldrogalis16:12:53

@stephenmhopper Iterative computation is the primitive that’s needed for most ML algorithms. Pretty much the ability to efficiently do loops within workflows and recover from faults. Haven’t decided on what will get extended as far as the Onyx job information model though.

michaeldrogalis16:12:07

Probably will get integrated with flow conditions.

stephenmhopper17:12:11

That sounds cool. I'm currently evaluating Onyx for building pipelines for ingesting and pre-processing data for ML. Knowing that someday I'll be able to do ML on top of Onyx too is a big win

lellis17:12:11

Hi all! Its possible to pass function as flow-condition like that?

(def flow-conditions
  [{:flow/from      :parse-log
    :flow/to        [:colaboracao.queroSerReceptor]
    :param/one      :fn-1
    :flow/predicate [:formiguinhas-service.onyx.datomic-plugin-utils/match? :param/one]
    :flow/doc       "Rota para ::queroSerReceptor?"}

   {:flow/from      :parse-log
    :flow/to        [:colaboracao.queroSerDeslocador]
    :param/one      :fn-2
    :flow/predicate [:formiguinhas-service.onyx.datomic-plugin-utils/match? :param/one]
    :flow/doc       "Rota para ::queroSerDeslocador?"}]) 

michaeldrogalis17:12:41

@lellis Flow conditions don’t support higher order functions, no. That structure gets EDN serialized and goes onto durable storage. You could resolve the value of :fn-* in a lifecycle and assoc it into the event map though, as you have access to the event map inside your predicate definition.

rajdevireddy17:12:03

Hi all, a followup to my earlier question. I am trying to add one error-task that handles any errors from any tasks in the workflow. I have added workflow entries like [:my-task-1 :error-task] in my workflow vector for all tasks and then a flow-condition line

{:flow/from :all
                      :flow/to [:error-task]
                      :flow/short-circuit? true
                      :flow/thrown-exception? true
                      :flow/predicate :my-task-ns/handle-error?}
What I see now is that the :error-task is being executed even though there is no error. And when there is an error it runs the error-task for all the tasks above the one where error occurred and for the one where error occurred. I must be doing something wrong but can't tell what it is. Any suggestions where to look?

hugesandwich22:12:13

Is there a sensible way that I can alter a function's parameters while a job is running? For example, let's say I have a function that increments a value by 1. At some time t, t=1 the inc amount is 1. Now at t=5, I want to set my increment function to inc by 10. One way I can do this is to just send in the inc amount with the segment ie {:my-num 5 :inc-amount 10} however that gets a little crazy as the number of tasks in the workflow grows. For instance, say I add a second decrement task that is similar that it also executes in the same flow. Now I need {:my num 5 {:inc :inc-amount 5} {:dec :dec-amount 2}} where at the same t, the inc amount was 5 and the dec amount was 2. It would be simple as well if I could generate these values algorithmically and I do to some degree (example linear interpolation), however quite often my values are coming from users altering things while a workflow is running. If you imagined that a user could alter the increment amount, there's no possible way for me to interpolate what that amount might be, so it needs to be provided explicitly. What I'd do normally is have something like a loop/recur where at the top or bottom of the loop, I grabbed the new value, be it from an atom, core.async channel, wherever, and I'm good to go for the next processing iteration. My problem is I'm not sure in a distributed environment like onyx there is a sane way to do it without sending a full map full of every possible step and every parameter. This really inflates the overhead of each segment, even if I drop the consumed task params from the map along the way. I suspect there's some clever way to do it, probably with windows. Any good ideas?

hugesandwich22:12:38

What I'd ideally like to do is something like sending a segment through that a given task reads, applies the change in a stateful way, and any data after that will be called with those params. That's assuming I'm doing this all in order, which to some degree I am via kafka.

hugesandwich22:12:52

As far as reading from another datastore like a DB at each task to go grab the latest param values, I think this would be too much overhead to the point where packing everything into the segment would still be faster.

michaeldrogalis22:12:44

You can alter :onyx.core/params with the tempo of each task lifecycle.

hugesandwich22:12:07

Thanks. I was looking at lifecycles earlier and I can't remember a few things (not in front of emacs at the moment). If I understand, I should be able to then send in a segment that has my alterations, read it during one of the lifecycles that occurs after I have the value of that segment, and change the task (structure doesn't matter, just as long as I can differentiate it from other segments)? And the change is visible to the next segment that arrives and across peers?

hugesandwich22:12:35

I have a second related problem which is I need to add more functions and thus tasks to my workflow over time. So in other words if I have a simple flow like in->inc->out, at some later point in time I may have in->inc->my new convert to string task->out. I assume that it's not really possible unless I spawn those tasks myself inside an onyx task specifically for spawning 1 or more additional tasks or stop the job and start it again with the new workflow.

hugesandwich22:12:08

Maybe a super awful way would be to run onyx-local-rt inside a task 🙂 . But yeah, I'm thinking it's not a good fit for the problem space really anyway as it's also possible the task order of any task can change.