Fork me on GitHub
#onyx
<
2017-08-24
>
dadair22:08:47

This is a bit of a vague question, but I can't seem to figure out where I am going wrong: I am getting a Handling uncaught exception thrown inside task lifecycle :lifecycle/prepare-batch. Killing the job. -> Exception type: clojure.lang.ExceptionInfo. Exception message: Wrong number of args (1) passed to: flat/eval73774/fn--73775 exception that's preventing my workflow from completing. My workflow looks like this:

(def workflow
  [[:read-actions-performed :process]
   [:process :wrap-errors]
   [:process :wrap-actions]
   [:process :wrap-statements]
   [:wrap-errors :write-actions-derived]
   [:wrap-actions :write-actions-derived]
   [:wrap-statements :write-statements-requested]])
with flow conditions:
(def flow-conditions
  [{:flow/from :process
    :flow/to [:wrap-errors]
    :flow/predicate [::errors?]}

   {:flow/from :process
    :flow/to [:wrap-actions]
    :flow/predicate [:and ::actions? [:not ::errors?]]}

   {:flow/from :process
    :flow/to [:wrap-statements]
    :flow/predicate [:and ::statements? [:not ::errors?]]}])
I have println statements in every task in the workflow, and every predicate in the flow conditions. The only println that fires is :process, and then the exception is thrown, preventing any further processing. I can't seem to figure out where I am hitting an arity exception. Anyone have any ideas?

michaeldrogalis22:08:39

@dadair Anything more in that stacktrace?

michaeldrogalis22:08:06

Whatever’s blowing up is unfortunately nested in this thing: flat/eval73774/fn--73775

dadair22:08:16

java.lang.Thread.run              Thread.java:  748
java.util.concurrent.ThreadPoolExecutor$Worker.run  ThreadPoolExecutor.java:  617
 java.util.concurrent.ThreadPoolExecutor.runWorker  ThreadPoolExecutor.java: 1142
                                               ...                               
                 clojure.core.async/thread-call/fn                async.clj:  442
 onyx.peer.task-lifecycle/start-task-lifecycle!/fn       task_lifecycle.clj:  939
      onyx.peer.task-lifecycle/run-task-lifecycle!       task_lifecycle.clj:  466
                onyx.peer.task-lifecycle/iteration       task_lifecycle.clj:  448
    onyx.peer.task-lifecycle.TaskStateMachine/exec       task_lifecycle.clj:  859
onyx.peer.task-lifecycle/wrap-lifecycle-metrics/fn       task_lifecycle.clj:  886
        onyx.peer.task-lifecycle/build-apply-fn/fn       task_lifecycle.clj:  523
                      onyx.peer.transform/apply-fn            transform.clj:   64
               onyx.peer.transform/apply-fn-single            transform.clj:   18
                                clojure.core/doall                 core.clj: 3128
                                clojure.core/dorun                 core.clj: 3113
                                  clojure.core/seq                 core.clj:  137
                                               ...                               
                               clojure.core/map/fn                 core.clj: 2733
            onyx.peer.transform/apply-fn-single/fn            transform.clj:   21
         onyx.peer.transform/collect-next-segments            transform.clj:    8
      onyx.peer.transform/collect-next-segments/fn            transform.clj:    8
                                               ...                               
                         justice.onyx.flat/process                 flat.clj:   41
                         justice.onyx.flat/process                 flat.clj:   46
                        justice.onyx.flat/dispatch                 flat.clj:   39

michaeldrogalis22:08:24

Which version of Onyx?

dadair22:08:39

yeah, and I have no idea what the flat/eval73774/fn--73775 thing is >.<

michaeldrogalis22:08:56

What’s the signature on your :onyx/fn for :process?

dadair22:08:23

(defn process [segment] ...)

dadair22:08:48

(defn build-catalog
  [bs bt opts]
  [{:onyx/name :read-actions-performed
    :onyx/plugin :onyx.plugin.kafka/read-messages
    :onyx/type :input
    :onyx/medium :kafka
    :kafka/topic "actions-performed"
    :kafka/receive-buffer-bytes 65536
    :kafka/zookeeper (:zk-addr opts)
    :kafka/offset-reset :earliest
    :kafka/deserializer-fn ::deserialize-fn
    :kafka/wrap-with-metadata? true
    :onyx/batch-timeout bt
    :onyx/max-peers 1
    :onyx/batch-size bs
    :onyx/doc "Reads messages from a Kafka topic"}

   {:onyx/name :process
    :onyx/type :function
    :onyx/fn ::process
    :onyx/max-peers 1
    :onyx/batch-size bs
    :onyx/batch-timeout bt}

   {:onyx/name :wrap-errors
    :onyx/type :function
    :onyx/fn ::wrap-client-errors-message
    :onyx/max-peers 1
    :onyx/batch-size bs
    :onyx/batch-timeout bt}

   {:onyx/name :wrap-actions
    :onyx/type :function
    :onyx/fn ::wrap-actions-derived-message
    :onyx/max-peers 1
    :onyx/batch-size bs
    :onyx/batch-timeout bt}

   {:onyx/name :wrap-statements
    :onyx/type :function
    :onyx/fn ::wrap-statements-request-message
    :onyx/max-peers 1
    :onyx/batch-size bs
    :onyx/batch-timeout bt}

   {:onyx/name :write-actions-derived
    :onyx/plugin :onyx.plugin.kafka/write-messages
    :onyx/type :output
    :onyx/medium :kafka
    :kafka/topic "actions-derived"
    :kafka/zookeeper (:zk-addr opts)
    :kafka/serializer-fn ::serialize-fn
    :kafka/request-size 307200
    :onyx/batch-size bs
    :onyx/batch-timeout bt
    :onyx/max-peers 1
    :onyx/doc "Writes segments to the `actions-derived` Kafka topic."}

   {:onyx/name :write-statements-requested
    :onyx/plugin :onyx.plugin.kafka/write-messages
    :onyx/type :output
    :onyx/medium :kafka
    :kafka/topic "statements-requested"
    :kafka/zookeeper (:zk-addr opts)
    :kafka/serializer-fn ::serialize-fn
    :kafka/request-size 307200
    :onyx/batch-size bs
    :onyx/batch-timeout bt
    :onyx/max-peers 1
    :onyx/doc "Writes segments to the `statements-requested` Kafka topic"}])

michaeldrogalis22:08:56

Hm, weird I was expecting there to be a signature mismatch, but looks okay

michaeldrogalis22:08:06

What’s happening at flat.clj line 39?

michaeldrogalis22:08:15

That looks like where the stacktrace bottoms out.

dadair22:08:06

ah I had a defmulti with an old arity after removing arguments to the method call