Fork me on GitHub
#onyx
<
2017-12-10
>
twashing19:12:19

… - Is there an example of multiple calls to submit-job, for separate but overlapping workflows (core.async or kafka)?

twashing19:12:23

- So that would be i) an input core.async channel (or kafka topic) that goes into an onyx processing function which then ii) outputs to a channel (or topic), then subsequently iii) is the input to a downstream onyx processing function

lucasbradstreet19:12:51

I don’t have any examples handy, but there shouldn’t be any problem doing so.

twashing19:12:30

I have these 2 workflows (using, core.async channels).

(def workflow1
  [[:scanner-command :ibgateway]
   [:ibgateway :scanner-command-result]
   [:ibgateway :scanner]])

(def workflow2
  [[:scanner :market-scanner]
   [:market-scanner :filtered-stocks]])

twashing19:12:33

I loop over both and call (onyx.api/submit-job peer-config job) on each iteration. I have successful job-ids, so I think they both went through.

lucasbradstreet19:12:36

Are you setting up the channels manually or are you using the convenience functions to generate some?

lucasbradstreet19:12:53

If it’s the latter, it’s probably generating two channels for :scanner

twashing19:12:10

I’m manually creating channels, and putting into lifecycles like so.

(defn inject-scanner-command-ch [event lifecycle]
  {:core.async/buffer in-buffer
   :core.async/chan base/chan-scanner-command})
(defn inject-scanner-command-result-ch [event lifecycle] {:core.async/chan base/chan-scanner-command-result})
(defn inject-scanner-ch [event lifecycle] {:core.async/chan base/chan-scanner})

(def in-calls-scanner-command {:lifecycle/before-task-start inject-scanner-command-ch})
(def out-calls-scanner-command-result {:lifecycle/before-task-start inject-scanner-command-result-ch})
(def out-calls-scanner {:lifecycle/before-task-start inject-scanner-ch})

(defn lifecycles [platform-type]
  ({:kafka []
    :onyx [{:lifecycle/task :scanner-command
            :lifecycle/calls :com.interrupt.streaming.platform.scanner-command/in-calls-scanner-command}
           {:lifecycle/task :scanner-command
            :lifecycle/calls :onyx.plugin.core-async/reader-calls}

           {:lifecycle/task :scanner-command-result
            :lifecycle/calls :com.interrupt.streaming.platform.scanner-command/out-calls-scanner-command-result}
           {:lifecycle/task :scanner-command-result
            :lifecycle/calls :onyx.plugin.core-async/writer-calls}

           {:lifecycle/task :scanner
            :lifecycle/calls :com.interrupt.streaming.platform.scanner-command/out-calls-scanner}
           {:lifecycle/task :scanner
            :lifecycle/calls :onyx.plugin.core-async/writer-calls}]}
   platform-type))

twashing19:12:23

… So the :scanner channel in both workflows, is the same channel (`base/chan-scanner`).

twashing19:12:11

I can see that messages are process from base/chan-scanner-command to base/chan-scanner.

twashing19:12:03

But my repl hangs when trying to consume from a downstream channel.

twashing19:12:10

Does this make sense? Let me know what other context I should provide.

lucasbradstreet20:12:08

chan-scanner-command is a var with the channel?

lucasbradstreet20:12:30

You’ve tested that they get to the output channel on the first job, right?

twashing20:12:22

Yes. Correct on both points.

lucasbradstreet20:12:16

Have you looked at your onyx.log for any errors?

lucasbradstreet20:12:56

Sorry, I’m really busy today so I can’t really help you dig in further. There’s no technical reason that I can think of why it shouldn’t work.

lucasbradstreet20:12:12

We’ve chained onyx jobs together via kafka topics - core.async should be no different.

lucasbradstreet20:12:36

I suspect the second job is erroring and killing itself

twashing20:12:07

No, not at all. Thanks for responding as far as you have.

twashing20:12:31

Let me have a look at onyx.log.