Fork me on GitHub
#onyx
<
2015-11-09
>
michaeldrogalis16:11:56

Next official release of Onyx will be out later this week. If anyone can upgrade and give us feedback on the 0.8.0-alpha1 release, that'd be super. 😄

spangler19:11:22

Hi @michaeldrogalis, trying to get 0.8.0-alpha1 to work now : )

spangler19:11:43

Got this funny exception trying to start:

java.lang.IllegalStateException: Capacity must be a positive power of 2 + TRAILER_LENGTH: capacity=1048448
java.lang.IllegalStateException: Could not initialise communication buffers

spangler19:11:50

trying to connect to Aeron

spangler19:11:01

Capacity of what?

spangler19:11:19

Where do I set the capacity? Is it on the Aeron side, or the Onyx side?

spangler19:11:24

was working fine before

michaeldrogalis20:11:30

@spangler: That's an Aeron message. Haven't seen that before. Before we go digging, just FYI that there's 1 breaking change: https://github.com/onyx-platform/onyx/blob/0.8.0-alpha1/changes.md#080

michaeldrogalis20:11:35

Seems unrelated though. Just informational

spangler20:11:47

Right, I updated that

michaeldrogalis20:11:29

Oh, I wonder if this is what Lucas was talking about the other day. Try RMing the directory Aeron creates. It might have stale files.

michaeldrogalis20:11:37

Its somewhat in /var for me, I believe. Brb, lunching

michaeldrogalis20:11:10

@spangler: I think what basically happened is that the Aeron folks made an optimization that requires an internal value they use to be a power of 2.

spangler20:11:27

I see that now

spangler20:11:40

removing the directory fixed it!

spangler20:11:50

Now I have a new problem....

spangler20:11:04

java.lang.Thread.run              Thread.java:  745
java.util.concurrent.ThreadPoolExecutor$Worker.run  ThreadPoolExecutor.java:  617
 java.util.concurrent.ThreadPoolExecutor.runWorker  ThreadPoolExecutor.java: 1142
               java.util.concurrent.FutureTask.run          FutureTask.java:  266
                                               ...                               
               clojure.core/binding-conveyor-fn/fn                 core.clj: 1916
   onyx.log.commands.common/start-new-lifecycle/fn               common.clj:  138
      onyx.peer.task-lifecycle.TaskLifeCycle/start       task_lifecycle.clj:  690
           onyx.peer.task-lifecycle/build-pipeline       task_lifecycle.clj:  602
                              clojure.core/ex-info                 core.clj: 4593
clojure.lang.ExceptionInfo: Failed to resolve or build plugin on the classpath, did you require/import the file that contains this plugin?
    data: 

michaeldrogalis20:11:07

Heh, okay. Ill add that piece of advice.. Somewhere.

spangler20:11:09

In my onyx.log

spangler20:11:24

I believe it may be talking about the onyx-kafka plugin?

spangler20:11:29

But I am requiring it

spangler20:11:36

and once again it was working with the previous version

michaeldrogalis20:11:40

Is there any data after that final line?

spangler20:11:59

It says 15-Nov-09 12:08:06 Ryans-MacBook-Pro-2.local INFO [onyx.peer.task-lifecycle] - [57bc3e82-6d42-4baf-919e-bc013a969d0c] Stopping Task LifeCycle for null

spangler20:11:09

and none of my tasks run

michaeldrogalis20:11:28

Hmm, okay. We didn't touch any of the resolving code, so unlikely to be a problem there. Thinking.

michaeldrogalis20:11:37

PM me the full onyx.log.

michaeldrogalis20:11:29

@spangler: We significantly tightened up the schema checks this release. Im thinking this caught something that was already wrong, but the error message is poor. If you send along some code, happy to look at that, too.

michaeldrogalis20:11:22

Stopping Task LifeCycle for null means something blew up in the task set up, in this case resolving a symbol, and it wasnt able to grab the task name. So thats not the issue.

spangler20:11:59

So what is the stacktrace then?

michaeldrogalis20:11:29

Symbol resolution failure.

michaeldrogalis20:11:38

I can help, but I need to see more.

spangler20:11:04

Okay, figuring out how to show you some code then

spangler20:11:08

What do you need to see?

michaeldrogalis20:11:24

Let's start with project.clj and your catalog.

spangler20:11:17

@michaeldrogalis project.clj (at least, the deps):

(defproject aviary/peregrine "0.0.9"
  :dependencies [[org.clojure/clojure "1.7.0"]
                 [joda-time "2.2"]
                 [org.clojure/core.async "0.2.371"]
                 [io.forward/sendgrid-clj "1.0.1"]
                 [aviary/cicinnurus "0.0.14"]
                 [hiccup-bridge "1.0.1"]
                 [hickory "0.5.4"]
                 [com.draines/postal "1.11.3"]
                 [org.apache.commons/commons-daemon "1.0.9"]
                 [aviary/condor "0.0.123"
                  :exclusions [joda-time com.taoensso/timbre]]
                 [caribou/caribou-core "0.15.5"
                  :exclusions [org.clojure/core.cache]]
                 [clojure.jdbc/clojure.jdbc-c3p0 "0.3.2"]
                 [environ "0.5.0"]
                 [hiccup "1.0.5"]
                 [org.onyxplatform/onyx "0.8.0-alpha1"]
                 [org.onyxplatform/onyx-kafka "0.8.0-alpha1"]
                 [littlebird-aviary/conduit "0.0.20"
                  :exclusions [com.taoensso/encore]]]

spangler20:11:37

jvm-opts

:jvm-opts ^:replace ["-server"
                       "-Xmx4g"
                       "-XX:-OmitStackTraceInFastThrow"
                       "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n"]

spangler20:11:17

catalog:

{:onyx/name :read-messages
    :onyx/plugin :onyx.plugin.kafka/read-messages
    :onyx/type :input
    :onyx/medium :kafka
    :kafka/topic (str topic "-input")
    :kafka/group-id "onyx-consumer"
    :kafka/fetch-size 307200
    :kafka/chan-capacity 1000
    :kafka/zookeeper zookeeper
    :kafka/offset-reset :smallest
    :kafka/force-reset? true
    :kafka/empty-read-back-off 500
    :kafka/commit-interval 500
    :kafka/deserializer-fn :peregrine.onyx.tasks/kafka-deserialize
    :kafka/partition 0
    :onyx/min-peers 1
    :onyx/max-peers 1
    :onyx/batch-size 100}

   {:onyx/name :fan-out
    :onyx/type :function
    :onyx/batch-size batch-size
    :onyx/fn :peregrine.onyx.tasks/fan-out}

   {:onyx/name :followers
    :onyx/type :function
    :onyx/batch-size batch-size
    :onyx/fn :peregrine.onyx.tasks.profile/followers}

   {:onyx/name :timeline
    :onyx/type :function
    :onyx/batch-size batch-size
    :onyx/fn :peregrine.onyx.tasks.profile/timeline}

   {:onyx/name :blog
    :onyx/type :function
    :onyx/batch-size batch-size
    :onyx/fn :peregrine.onyx.tasks.profile/blog}

   {:onyx/name :fullcontact
    :onyx/type :function
    :onyx/batch-size batch-size
    :onyx/fn :peregrine.onyx.tasks.profile/fullcontact}

   {:onyx/name :merge-in
    :onyx/type :function
    :onyx/batch-size batch-size
    :onyx/fn :peregrine.onyx.tasks/merge-in}

   {:onyx/name :write-messages
    :onyx/plugin :onyx.plugin.kafka/write-messages
    :onyx/type :output
    :onyx/medium :kafka
    :kafka/topic (str topic "-output")
    :kafka/zookeeper zookeeper
    :kafka/serializer-fn :peregrine.onyx.tasks/kafka-serialize
    :onyx/batch-size batch-size}

spangler20:11:37

I pass in a unique topic for each job, because I saw that in your onyx-kafka examples

michaeldrogalis20:11:01

We do that for testing - you can use the same topic if you like.

michaeldrogalis20:11:06

I don't see anything wrong here. Hm

spangler20:11:39

So if two jobs get started and they are using the same topic, how are the messages distinguished?

spangler20:11:02

Do you put your own job id on the topic or something?

michaeldrogalis20:11:13

They're read the same sequence of messages in parallel. Kafka is agnostic to which processes are reading from it.

lucasbradstreet20:11:47

it this required? :peregrine.onyx.tasks/kafka-serialize

lucasbradstreet20:11:53

the peregrine.onyx.tasks bit I mean

spangler20:11:08

Right, I mean if there are two different jobs using the same topic, how do messages route to only the job they are supposed to?

spangler20:11:26

@lucasbradstreet Yeah, it says to provide a serialize method?

lucasbradstreet20:11:35

Yes, but have you required the ns that it belongs in?

spangler20:11:45

We are using transit to encode our messages

spangler20:11:48

Let me check

michaeldrogalis20:11:51

@spangler: They don't. Kafka isn't designed to work that way.

spangler20:11:20

@michaeldrogalis Right, so I see that being a problem with sharing topics between jobs

spangler20:11:28

unless I am missing something there?

michaeldrogalis20:11:05

I mean, it sounds like it might be a semantic problem for you, but lots of people design multiple applications to read from the same log and see the same messages, without coordination. Thats a feature

spangler20:11:12

@lucasbradstreet Yeah those serialize/deserialize methods are in the same namespace as the catalog actually

spangler20:11:43

@michaeldrogalis Ah, so I just put some kind of job id on there perhaps?

spangler20:11:07

The thing is I am using them as batch jobs, so require the :done sentinel to know when the job has completed

lucasbradstreet20:11:28

Yeah, different topic names is probably the best solution?

spangler20:11:48

If two jobs are using the same topic, then they would both receive the :done for one of them completing

lucasbradstreet20:11:56

Ah, I think you need to require that ns from whatever starts the peers

michaeldrogalis20:11:06

@spangler: Ah, gotcha. That bit was unclear to me

lucasbradstreet20:11:08

If you’re submitting the job in another process then there’s no guarantee it was resolved

spangler20:11:35

Hmm okay, funny it works in the old version

spangler20:11:52

I will try that though

michaeldrogalis20:11:49

Yeah, that is odd.

spangler20:11:37

Hmm... also, how do I configure the kafka consumer you are using in onyx-kafka? I need to set the fetch.message.max.bytes to something higher

spangler20:11:45

There it is, thank you

lucasbradstreet20:11:14

I was thinking we should probably have an additional opts key that we merge into the config that we pass the producer/consumer

lucasbradstreet20:11:18

Just for anything else that we miss

spangler20:11:20

I see I am even setting it in the catalog

michaeldrogalis20:11:37

@lucasbradstreet: Weird thing about that error is that the :data key is empty.

lucasbradstreet20:11:56

Yeah, actually, I meant to create an issue about that

lucasbradstreet20:11:00

I’ve seen that elsewhere

michaeldrogalis20:11:48

Hum, first time I've ran into it.

lucasbradstreet20:11:38

I’ve definitely seen it before. I wonder if it has something to do with the workaround we used to fix those swallowed exceptions

spangler20:11:50

Hmm... just increased the :kafka/fetch-size to 8320000, but I am still getting this exception in onyx.log:

org.apache.kafka.common.errors.RecordTooLargeException: The message is 1089614 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration

spangler20:11:38

Is there another option I need to set?

spangler20:11:44

Not seeing anything else relevant in the docs...

spangler20:11:03

Is it the fetch.message.max.bytes that you guys are setting with that?

lucasbradstreet20:11:48

We’re not actually setting that tho are we?

lucasbradstreet20:11:33

We’re doing a fetch request of that size

lucasbradstreet20:11:42

But I don’t think we configure max.request.size on the consumer

michaeldrogalis20:11:37

I thought max.request.size was a server/producer option.

lucasbradstreet21:11:14

Yeah, it’s a producer option

spangler21:11:46

Right, I think it is failing on :write-messages

spangler21:11:52

Those are the big ones

spangler21:11:11

:read-messages only deals with tiny things

lucasbradstreet21:11:33

Yeah, we have no way to configure that currently

spangler21:11:01

Oh! Hmm....

lucasbradstreet21:11:21

Just grab it directly from the task map and put it in that config map

lucasbradstreet21:11:24

and I think you should be good

spangler21:11:36

I will take a look

lucasbradstreet21:11:35

I think I’d also like it to take an opts map that gets merged into that config map

spangler21:11:41

Yes, that would be ideal

lucasbradstreet21:11:48

That would prevent us from having to support every option

lucasbradstreet21:11:20

Yeah, actually I wouldn't even bother with supporting both

michaeldrogalis21:11:44

It's not evident to me that taking an opaque map from the catalog entry is the best way to go. While I agree that we don't want to be stuck support every switch for the consumer/producer that we're ultimately wrapping, I also don't want to make it look like it's a totally transparent usage. Splitting reading across multiple machines doesn't offer the same interface that whatever we're using under the covers might be offering. It might be a more viable option to do a map via a task lifecycle, though.

michaeldrogalis21:11:58

Just a thought -- anyway.

lucasbradstreet21:11:02

I’m OK with it as long as we’re supporting a particular consumer/producer

spangler21:11:12

@michaeldrogalis Maybe it is something that can be passed in on boot up, rather than in the catalog?

spangler21:11:30

It seems more like a boot up configuration option that won't be changing between jobs

michaeldrogalis21:11:44

That's a possibility, yeah.

spangler21:11:45

You guys have a reasonable default for max.request.size?

lucasbradstreet21:11:53

I guess we could just support injecting a custom producer/consumer

michaeldrogalis21:11:00

Whatever clj-kafka is defaulting to.

michaeldrogalis21:11:27

Gosh its weird that the :data key is absent.

lucasbradstreet21:11:42

We just default to what kafka defaults to.

spangler21:11:49

FYI, requiring that namespace in the onyx bootup lib actually got it to work

spangler21:11:03

The reasons are mysterious, but at least it works

lucasbradstreet21:11:05

Cool, figured it would

michaeldrogalis21:11:40

Cool, good to hear. Yes, put all your namespaces that need to be required in one place - that's a helpful idiom. I'll look into why the error message got swallowed, in the mean time.

lucasbradstreet21:11:40

Yeah, it needs to be in the same namespace as start peers, or a transitive one from that ns

lucasbradstreet21:11:02

I’m not sure how I feel about all the solutions to this kafka config issue. I don’t really like injecting a producer/consumer either, since we still want most of the rest of the other lifecycle, so we’ll need users to start supplying two lifecycle calls

michaeldrogalis21:11:43

Will think on it, and for now add a patch to add the option @spangler is looking for.

lucasbradstreet21:11:45

We’re pretty much using their exact producer/consumer and if start allowing users to inject them then it’s not like we can easily switch anyway

michaeldrogalis21:11:53

Yeah, good point

spangler21:11:10

I just patched it

lucasbradstreet21:11:17

Happy to debate it later

spangler21:11:32

Yeah we will want to be able to override arbitrary config options for sure

michaeldrogalis21:11:08

@spangler: So you're up and running now?

spangler21:11:28

Hmm.... now I just got that blank :data key error four times in a row in my onyx.log!

spangler21:11:40

Something is fishy

lucasbradstreet21:11:52

Yeah I’m pretty sure it’s always that way

michaeldrogalis21:11:38

We upgraded Timbre recently. Maybe something went wrong there

lucasbradstreet21:11:46

It was as I thought

spangler21:11:19

So should I wait then?

michaeldrogalis21:11:28

Aha! That thing.

spangler21:11:34

Really need to get this up and running... I guess I can go back to the previous version

lucasbradstreet21:11:46

@spangler: it shouldn’t make it crash

lucasbradstreet21:11:55

but it will prevent you from getting as much info

spangler21:11:00

... Yet, it is failing

lucasbradstreet21:11:01

I can push up a snapshot with that fix if you’d like

lucasbradstreet21:11:09

Yus. So you’ve got other issues

spangler21:11:22

So you are saying it will illuminate my other issues then

spangler21:11:31

Sure, would be most welcome

lucasbradstreet21:11:20

Give it a few mins to ci/build and a snapshot will be up

spangler21:11:54

Just add a -SNAPSHOT at the end of the 0.8.0-alpha1 then?

michaeldrogalis21:11:22

0.8.0-SNAPSHOT once the build finishes

lucasbradstreet21:11:21

Sorry, needs to rebuild on master, but it’ll definitely be up in ~4

spangler21:11:02

Is that up? The snapshop I'm pulling in has a timestamp from Saturday

lucasbradstreet21:11:46

You can fix the snapshot version

spangler21:11:59

Hmm... just cleared my .m2 and it still pulled it in

lucasbradstreet21:11:00

try 0.8.0-20151109.213854-17

spangler21:11:28

There we go

spangler21:11:41

Okay, here is the real error then:

[{:type org.apache.kafka.common.config.ConfigException
             :message "Invalid value 8320000 for configuration max.request.size: Expected value to be an number."

spangler21:11:59

How is 8320000 not a number?

spangler21:11:14

Does it get serialized somewhere?

lucasbradstreet21:11:37

did you pass a string?

michaeldrogalis21:11:03

The code is pretty thin, we're not doing much other than passing the map to clj-kafka

lucasbradstreet21:11:04

maybe make it a long?

spangler21:11:12

This is the line I added :kafka/request-size 8320000

michaeldrogalis21:11:22

Yeah, its talking to Scala, so maybe its something weird like that. Shrug

spangler21:11:47

Got it, yeah I'll figure it out

lucasbradstreet21:11:01

Yeah, very weird

spangler22:11:45

Okay it needs to be a string

spangler22:11:01

That must be why it says it must be a number

spangler22:11:42

A little gallows humor there

spangler22:11:39

Hmm... okay

spangler22:11:54

Tasks launched, everything seemed fine, but I never got my :done

lucasbradstreet22:11:58

Must have tried called parse on it and caught the exception and they figured that's the only reason it would fail to parse

spangler22:11:01

job still thinks it is running

spangler22:11:27

Yeah, so fixed one issue, now have another

lucasbradstreet22:11:30

Did you get everything else?

spangler22:11:46

Yeah, the tasks all completed

spangler22:11:51

but it is waiting on :done

lucasbradstreet22:11:07

And you got other messages at the output?

spangler22:11:46

Well, not sure

spangler22:11:55

as it is waiting for all of them at once

spangler22:11:02

So no, I haven't gotten any of them

lucasbradstreet22:11:18

Are you using the Kafka plugin's take-segments?

lucasbradstreet22:11:10

K. Copy it from there into your code and add some debug messages to see if it can read anything.

spangler22:11:27

Copy take-segments?

lucasbradstreet22:11:32

Yeah. Probably easier than rewriting the code to read from the topic

lucasbradstreet22:11:19

Also check for other exceptions in the log

lucasbradstreet22:11:32

Your job still just be getting killed early

spangler22:11:50

No errors in onyx.log

lucasbradstreet22:11:53

Anyway sleep for real this time. Gl

lucasbradstreet22:11:19

Just a quick check. You're passing in the deserialiser fn to take segments, not the kw path to it?

lucasbradstreet22:11:56

Total shot in the dark

spangler22:11:19

Not sure what you mean

lucasbradstreet22:11:49

The task map takes a keyword pointing the the deserialiser function

lucasbradstreet22:11:00

But take-segments takes an actual function

spangler22:11:22

Yes, I am passing in the function

spangler22:11:25

good thought though

michaeldrogalis22:11:14

Gotta be off too. Catch ya

spangler23:11:25

Okay, submitted a pull request

spangler23:11:38

I got everything working locally

spangler23:11:48

(to the onyx-kafka plugin)

spangler23:11:31

Needed to be able to add some config to the take-segments consumer as well