This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2015-11-09
Channels
- # alda (9)
- # announcements (1)
- # beginners (6)
- # boot (140)
- # cbus (2)
- # cider (27)
- # cljs-dev (19)
- # cljsrn (17)
- # clojure (104)
- # clojure-art (1)
- # clojure-brasil (5)
- # clojure-colombia (2)
- # clojure-russia (146)
- # clojure-sg (3)
- # clojurescript (64)
- # clojurex (1)
- # cursive (17)
- # data-science (22)
- # datomic (41)
- # editors-rus (5)
- # events (1)
- # hoplon (61)
- # ldnclj (35)
- # lein-figwheel (1)
- # off-topic (1)
- # om (119)
- # onyx (214)
- # re-frame (3)
- # reagent (13)
- # robots (5)
- # slack-help (1)
- # yada (17)
Next official release of Onyx will be out later this week. If anyone can upgrade and give us feedback on the 0.8.0-alpha1 release, that'd be super. 😄
Hi @michaeldrogalis, trying to get 0.8.0-alpha1 to work now : )
Got this funny exception trying to start:
java.lang.IllegalStateException: Capacity must be a positive power of 2 + TRAILER_LENGTH: capacity=1048448
java.lang.IllegalStateException: Could not initialise communication buffers
@spangler: That's an Aeron message. Haven't seen that before. Before we go digging, just FYI that there's 1 breaking change: https://github.com/onyx-platform/onyx/blob/0.8.0-alpha1/changes.md#080
Seems unrelated though. Just informational
Oh, I wonder if this is what Lucas was talking about the other day. Try RMing the directory Aeron creates. It might have stale files.
Its somewhat in /var
for me, I believe. Brb, lunching
@spangler: I think what basically happened is that the Aeron folks made an optimization that requires an internal value they use to be a power of 2.
java.lang.Thread.run Thread.java: 745
java.util.concurrent.ThreadPoolExecutor$Worker.run ThreadPoolExecutor.java: 617
java.util.concurrent.ThreadPoolExecutor.runWorker ThreadPoolExecutor.java: 1142
java.util.concurrent.FutureTask.run FutureTask.java: 266
...
clojure.core/binding-conveyor-fn/fn core.clj: 1916
onyx.log.commands.common/start-new-lifecycle/fn common.clj: 138
onyx.peer.task-lifecycle.TaskLifeCycle/start task_lifecycle.clj: 690
onyx.peer.task-lifecycle/build-pipeline task_lifecycle.clj: 602
clojure.core/ex-info core.clj: 4593
clojure.lang.ExceptionInfo: Failed to resolve or build plugin on the classpath, did you require/import the file that contains this plugin?
data:
Heh, okay. Ill add that piece of advice.. Somewhere.
Is there any data after that final line?
It says 15-Nov-09 12:08:06 Ryans-MacBook-Pro-2.local INFO [onyx.peer.task-lifecycle] - [57bc3e82-6d42-4baf-919e-bc013a969d0c] Stopping Task LifeCycle for null
Hmm, okay. We didn't touch any of the resolving code, so unlikely to be a problem there. Thinking.
PM me the full onyx.log.
@spangler: We significantly tightened up the schema checks this release. Im thinking this caught something that was already wrong, but the error message is poor. If you send along some code, happy to look at that, too.
Stopping Task LifeCycle for null
means something blew up in the task set up, in this case resolving a symbol, and it wasnt able to grab the task name. So thats not the issue.
Symbol resolution failure.
I can help, but I need to see more.
Let's start with project.clj and your catalog.
@michaeldrogalis project.clj (at least, the deps):
(defproject aviary/peregrine "0.0.9"
:dependencies [[org.clojure/clojure "1.7.0"]
[joda-time "2.2"]
[org.clojure/core.async "0.2.371"]
[io.forward/sendgrid-clj "1.0.1"]
[aviary/cicinnurus "0.0.14"]
[hiccup-bridge "1.0.1"]
[hickory "0.5.4"]
[com.draines/postal "1.11.3"]
[org.apache.commons/commons-daemon "1.0.9"]
[aviary/condor "0.0.123"
:exclusions [joda-time com.taoensso/timbre]]
[caribou/caribou-core "0.15.5"
:exclusions [org.clojure/core.cache]]
[clojure.jdbc/clojure.jdbc-c3p0 "0.3.2"]
[environ "0.5.0"]
[hiccup "1.0.5"]
[org.onyxplatform/onyx "0.8.0-alpha1"]
[org.onyxplatform/onyx-kafka "0.8.0-alpha1"]
[littlebird-aviary/conduit "0.0.20"
:exclusions [com.taoensso/encore]]]
jvm-opts
:jvm-opts ^:replace ["-server"
"-Xmx4g"
"-XX:-OmitStackTraceInFastThrow"
"-agentlib:jdwp=transport=dt_socket,server=y,suspend=n"]
catalog:
{:onyx/name :read-messages
:onyx/plugin :onyx.plugin.kafka/read-messages
:onyx/type :input
:onyx/medium :kafka
:kafka/topic (str topic "-input")
:kafka/group-id "onyx-consumer"
:kafka/fetch-size 307200
:kafka/chan-capacity 1000
:kafka/zookeeper zookeeper
:kafka/offset-reset :smallest
:kafka/force-reset? true
:kafka/empty-read-back-off 500
:kafka/commit-interval 500
:kafka/deserializer-fn :peregrine.onyx.tasks/kafka-deserialize
:kafka/partition 0
:onyx/min-peers 1
:onyx/max-peers 1
:onyx/batch-size 100}
{:onyx/name :fan-out
:onyx/type :function
:onyx/batch-size batch-size
:onyx/fn :peregrine.onyx.tasks/fan-out}
{:onyx/name :followers
:onyx/type :function
:onyx/batch-size batch-size
:onyx/fn :peregrine.onyx.tasks.profile/followers}
{:onyx/name :timeline
:onyx/type :function
:onyx/batch-size batch-size
:onyx/fn :peregrine.onyx.tasks.profile/timeline}
{:onyx/name :blog
:onyx/type :function
:onyx/batch-size batch-size
:onyx/fn :peregrine.onyx.tasks.profile/blog}
{:onyx/name :fullcontact
:onyx/type :function
:onyx/batch-size batch-size
:onyx/fn :peregrine.onyx.tasks.profile/fullcontact}
{:onyx/name :merge-in
:onyx/type :function
:onyx/batch-size batch-size
:onyx/fn :peregrine.onyx.tasks/merge-in}
{:onyx/name :write-messages
:onyx/plugin :onyx.plugin.kafka/write-messages
:onyx/type :output
:onyx/medium :kafka
:kafka/topic (str topic "-output")
:kafka/zookeeper zookeeper
:kafka/serializer-fn :peregrine.onyx.tasks/kafka-serialize
:onyx/batch-size batch-size}
I pass in a unique topic for each job, because I saw that in your onyx-kafka
examples
We do that for testing - you can use the same topic if you like.
I don't see anything wrong here. Hm
So if two jobs get started and they are using the same topic, how are the messages distinguished?
They're read the same sequence of messages in parallel. Kafka is agnostic to which processes are reading from it.
it this required? :peregrine.onyx.tasks/kafka-serialize
the peregrine.onyx.tasks bit I mean
Right, I mean if there are two different jobs using the same topic, how do messages route to only the job they are supposed to?
@lucasbradstreet Yeah, it says to provide a serialize method?
Yes, but have you required the ns that it belongs in?
@spangler: They don't. Kafka isn't designed to work that way.
@michaeldrogalis Right, so I see that being a problem with sharing topics between jobs
I mean, it sounds like it might be a semantic problem for you, but lots of people design multiple applications to read from the same log and see the same messages, without coordination. Thats a feature
@lucasbradstreet Yeah those serialize/deserialize methods are in the same namespace as the catalog actually
@michaeldrogalis Ah, so I just put some kind of job id on there perhaps?
The thing is I am using them as batch jobs, so require the :done
sentinel to know when the job has completed
Yeah, different topic names is probably the best solution?
If two jobs are using the same topic, then they would both receive the :done
for one of them completing
Ah, I think you need to require that ns from whatever starts the peers
@spangler: Ah, gotcha. That bit was unclear to me
If you’re submitting the job in another process then there’s no guarantee it was resolved
Yeah, that is odd.
Hmm... also, how do I configure the kafka consumer you are using in onyx-kafka
? I need to set the fetch.message.max.bytes
to something higher
I was thinking we should probably have an additional opts key that we merge into the config that we pass the producer/consumer
Just for anything else that we miss
@lucasbradstreet: Weird thing about that error is that the :data
key is empty.
Yeah, actually, I meant to create an issue about that
I’ve seen that elsewhere
Hum, first time I've ran into it.
I’ve definitely seen it before. I wonder if it has something to do with the workaround we used to fix those swallowed exceptions
Hmm... just increased the :kafka/fetch-size
to 8320000, but I am still getting this exception in onyx.log
:
org.apache.kafka.common.errors.RecordTooLargeException: The message is 1089614 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration
We’re not actually setting that tho are we?
We’re doing a fetch request of that size
But I don’t think we configure max.request.size on the consumer
I thought max.request.size
was a server/producer option.
Ah, maybe it is
Yeah, it’s a producer option
Yeah, we have no way to configure that currently
We’d accept a patch https://github.com/onyx-platform/onyx-kafka/blob/0.8.x/src/onyx/plugin/kafka.clj#L278
Just grab it directly from the task map and put it in that config map
and I think you should be good
I think I’d also like it to take an opts map that gets merged into that config map
That would prevent us from having to support every option
Yeah, actually I wouldn't even bother with supporting both
It's not evident to me that taking an opaque map from the catalog entry is the best way to go. While I agree that we don't want to be stuck support every switch for the consumer/producer that we're ultimately wrapping, I also don't want to make it look like it's a totally transparent usage. Splitting reading across multiple machines doesn't offer the same interface that whatever we're using under the covers might be offering. It might be a more viable option to do a map via a task lifecycle, though.
Just a thought -- anyway.
I’m OK with it as long as we’re supporting a particular consumer/producer
@michaeldrogalis Maybe it is something that can be passed in on boot up, rather than in the catalog?
It seems more like a boot up configuration option that won't be changing between jobs
That's a possibility, yeah.
I guess we could just support injecting a custom producer/consumer
Whatever clj-kafka is defaulting to.
Gosh its weird that the :data
key is absent.
We just default to what kafka defaults to.
Cool, figured it would
Cool, good to hear. Yes, put all your namespaces that need to be required in one place - that's a helpful idiom. I'll look into why the error message got swallowed, in the mean time.
Yeah, it needs to be in the same namespace as start peers, or a transitive one from that ns
I’m not sure how I feel about all the solutions to this kafka config issue. I don’t really like injecting a producer/consumer either, since we still want most of the rest of the other lifecycle, so we’ll need users to start supplying two lifecycle calls
Will think on it, and for now add a patch to add the option @spangler is looking for.
We’re pretty much using their exact producer/consumer and if start allowing users to inject them then it’s not like we can easily switch anyway
Yeah, good point
http://kafka.apache.org/documentation.html#simpleconsumerapi lots of options under producer configs
Happy to debate it later
@spangler: So you're up and running now?
Hmm.... now I just got that blank :data
key error four times in a row in my onyx.log!
Yeah I’m pretty sure it’s always that way
We upgraded Timbre recently. Maybe something went wrong there
It was as I thought
I broked it
I figured it was to do with this workaround https://github.com/onyx-platform/onyx/blob/0.8.x/src/onyx/static/logging_configuration.clj#L14
Aha! That thing.
Really need to get this up and running... I guess I can go back to the previous version
@spangler: it shouldn’t make it crash
but it will prevent you from getting as much info
I can push up a snapshot with that fix if you’d like
Yus. So you’ve got other issues
Give it a few mins to ci/build and a snapshot will be up
0.8.0-SNAPSHOT
once the build finishes
Sorry, needs to rebuild on master, but it’ll definitely be up in ~4
Its cached. Clojars has the latest: https://clojars.org/org.onyxplatform/onyx/versions/0.8.0-SNAPSHOT
You can fix the snapshot version
try 0.8.0-20151109.213854-17
Okay, here is the real error then:
[{:type org.apache.kafka.common.config.ConfigException
:message "Invalid value 8320000 for configuration max.request.size: Expected value to be an number."
did you pass a string?
The code is pretty thin, we're not doing much other than passing the map to clj-kafka
maybe make it a long?
Yeah, its talking to Scala, so maybe its something weird like that. Shrug
Yeah, very weird
Sleep time
Naturally haha
Must have tried called parse on it and caught the exception and they figured that's the only reason it would fail to parse
@lucasbradstreet Exactly
Did you get everything else?
And you got other messages at the output?
Are you using the Kafka plugin's take-segments?
K. Copy it from there into your code and add some debug messages to see if it can read anything.
Yeah. Probably easier than rewriting the code to read from the topic
Also check for other exceptions in the log
Your job still just be getting killed early
Still might be
Anyway sleep for real this time. Gl
Just a quick check. You're passing in the deserialiser fn to take segments, not the kw path to it?
Total shot in the dark
The task map takes a keyword pointing the the deserialiser function
But take-segments takes an actual function
Gotta be off too. Catch ya