Fork me on GitHub

whats the recommended way to kill all jobs in an onyx cluster?


im currently using this


but it seems the replica is not always up to date


@chrisblom: you'll need some measure of whether it's done playing back the log. In the past I've looked at the last log entry to see if it changes over some timeout period, or the time on the last log entry is greater than the time I made the call


Can aggregations exert backpressure or do they have to update the input behavior through the sync function if desired?


@yonatanel what do you mean by the second “update” part?


Still thinking about it, but I mean that if the aggregation is still struggling to process something, it can perhaps indicate it to the external world in its sync function, and then perhaps a Kafka reader can wait. Is that necessary?


Hmm. I’m not sure. There will be a natural backpressure as a result of onyx/max-pending, but it won’t be notifying whatever is reading what you’re syncing to


You could choose sync with both a timer and a number of segments, to try to get some consistency, while also updating more frequently as the number of segments goes up. This could be a little wasteful though as both triggers could be called sometimes in a single batch. Potentially a combination type trigger would be helpful there


The idea there would be that you track the time of the last trigger call, and also the number of segments, and if either is hit, then you reset both


You’d need to create a new trigger implementation to do that


aggregations affect the pending number of segments? I thought it's separate from tasks


Well, somewhat, because if your aggregations are lagging, the segments won’t get acked (they only get acked after the aggregation calls), which will mean that you’re not making room for new segments to be read in your input tasks


OK. Didn't know that.


Otherwise there’d be no fault tolerance in the aggregation feature


Makes sense. I didn't get to that part of the guide either :)


@michaeldrogalis I use job metadata for the job start time now. I find this better for global constants as to use functions params.


Thanks. Especially because one can access the job metadata very easy from within lifecycle functions though the event map under :onyx.core/metadata.


Where do people typically swap out standard input and output fixtures for core.async channels for testing purposes? I'm trying to test this simple job that I'm evolving:

(defn twitter-job [batch-settings twitter-config]
  (let [base-job {:workflow [[:in :out]]
                  :catalog []
                  :lifecycles []
                  :windows []
                  :triggers []
                  :flow-conditions [{:flow/from :in
                                     :flow/to [:out]
                                     :flow/predicate ::has-hashtag}]
                  :task-scheduler :onyx.task-scheduler/balanced}]
    (-> base-job
        (add-task (twitter/stream :in [:text :hashtagEntities]
                                  (merge twitter-config batch-settings)))
        ;(add-task (core-async-task/input :in batch-settings))
        (add-task (core-async-task/output :out batch-settings)))))
For my tests, I want the input stream to be a core.async channel. Obviously, I can just comment out the twitter stream definition in my job and uncomment the input stream, but that doesn't seem like a best practice to me


Should I make a separate function for creating a version of my job which is only run for tests or should I write my test in such a way that it just overrides the input/output entries in my job map?


@stephenmhopper The approach I take is to have a function that builds up the core logic of the job, then two seperate functions that annotate the job based on what environment it will run in (core.async plugins or external data stores).


Do you have an example of that somewhere?


@gardnervickers Is this what you were hinting at?:

(def base-twitter-job
  {:workflow        [[:in :out]]
   :catalog         []
   :lifecycles      []
   :windows         []
   :triggers        []
   :flow-conditions [{:flow/from      :in
                      :flow/to        [:out]
                      :flow/predicate ::has-hashtag}]
   :task-scheduler  :onyx.task-scheduler/balanced})

(defn twitter-job [batch-settings twitter-config]
  (let [base-job base-twitter-job]
    (-> base-job
        (add-task (twitter/stream :in [:text :hashtagEntities]
                                  (merge twitter-config batch-settings)))
        (add-task (core-async-task/output :out batch-settings)))))

(defn local-twitter-job [batch-settings]
  (let [base-job base-twitter-job]
    (-> base-job
        (add-task (core-async-task/input :in batch-settings))
        (add-task (core-async-task/output :out batch-settings)))))


Cool. I like that approach. Thank you!


@akiel Excellent solution.


Hello - is there a recommended kafka lib these days? Just starting out with franzy but I am getting exceptions I don't get so I thought I would ask in here 🙂


@nha I would recommend checking out Kinsky: Franzy isn’t maintained anymore. We might cut directly over to the Java API.


Ah thanks 🙂 I haven't started yet so good to know


Weeell kinksy looks like the activity declined in september.. 😕


@nha Clojure has been hopeless with maintainers for Kafka bindings. I’d go directly to Java at this point. 😕


That's too bad 😕


I'll do that. thanks 🙂