Fork me on GitHub
#onyx
<
2018-11-09
>
guillaume15:11:43

Hi, reading the doc and trying the examples. I’m wondering if there’s a way to redirect the output of a window aggregation to a catalog entry part of the workflow. It seems that every example will dump the window either to stdout or a db (with side effects). However, what if i’d like to do transformations post aggregation? Is that possible?

eriktjacobsen22:11:20

@guillaume.carbonneau here is example

(defn health-check-memory-task
  [task-name peer-config task-opts]
  {:task {:task-map (merge
                      {:onyx/name task-name
                       :onyx/group-by-key :hash
                       :onyx/flux-policy :recover
                       :onyx/n-peers 2
                       :onyx/type :reduce}
                      task-opts)
          :windows [{:window/id :health-check-memory
                     :window/task task-name
                     :window/type :global
                     :window/aggregation ::health-check-memory-aggregation}]
          :triggers [{:trigger/window-id :health-check-memory
                      :trigger/id :health-check-memory0
                      :trigger/emit ::emit-health-check
                      :trigger/on :onyx.triggers/segment
                      :trigger/threshold [1 :elements]}]}})
(defn emit-health-check
  [event window trigger window-data state]
  (when-not (= :job-completed (:event-type window-data))
    (:health-check state)))
Then just connect them in workflow, and optionally add flow conditions.