Fork me on GitHub
#onyx
<
2017-05-26
>
stephenmhopper15:05:39

Hello again, everybody. I’m using the Onyx SQS plugin with Onyx 0.10.0-beta17. I’m running into an issue when creating adding an SQS input task to my job. It’s claiming that I have keywords that are namespaced with things other than :onyx or :sqs, but my task map looks like this:

{:sqs/queue-name "my queue"
:onyx/plugin :onyx.plugin.sqs-input/input
:onyx/medium :sqs
:onyx/batch-timeout 1000,
:onyx/type :input
:sqs/attribute-names []
:onyx/name :in
:sqs/message-attribute-names []
:sqs/deserializer-fn :clojure.edn/read-string
:sqs/idle-backoff-ms 1000
:onyx/doc "Reads segments from an SQS queue"
:sqs/region "us-east-1"
:onyx/batch-size 2
}

stephenmhopper15:05:18

The SQS plugin has this schema definition:

(def SQSInputTaskMap
  {(s/optional-key :sqs/queue-name) s/Str
   (s/optional-key :sqs/queue-url) s/Str
   (s/optional-key :onyx/batch-timeout) batch-timeout-check
   (s/optional-key :sqs/attribute-names) [s/Str]
   (s/optional-key :sqs/message-attribute-names) [s/Str]
   :sqs/region s/Str
   :onyx/batch-size max-batch-size
   :sqs/deserializer-fn os/NamespacedKeyword
   (os/restricted-ns :sqs) s/Any})
Does the (os/restricted-ns :sqs) s/Any prohibit other :onyx entries like :onyx/doc?

michaeldrogalis16:05:32

It should only be restricting sqs/ keys

stephenmhopper16:05:06

Oh, does that mean that I have an extraneous :sqs/something entry?

michaeldrogalis16:05:28

@stephenmhopper I’d have to see the stacktrace, but that’s my guess, yeah

michaeldrogalis16:05:46

Is :sqs/deserializer-fn a thing?

michaeldrogalis16:05:51

Its been a long time since I’ve looked at the plugin

stephenmhopper16:05:24

Ah, the :sqs/idle-backoff-ms entry shouldn’t be there. I removed it, and now it works. It was confusing because the error message seemed to indicate that all of my entries had to be namespaced with either :sqs or :onyx, so I was looking for the wrong thing

stephenmhopper16:05:32

It looks like :sqs/idle-backoff-ms is still in the readme even though the schema doesn’t allow it, so I’ve submitted an issue on GitHub to update one or the other.

michaeldrogalis16:05:59

@stephenmhopper Thanks! We’ll fix that up. Yeah, Schema’s error output isn’t great. 😕 Looking forward to switch it all out to Spec soon

stephenmhopper16:05:26

That’s great to hear. I recently started switching all of my apps over from Schema to Spec and I love it

lucasbradstreet16:05:38

@stephenmhopper thanks, it was an issue in the docs

lucasbradstreet16:05:43

I’ve removed it from the README

stephenmhopper20:05:31

@lucasbradstreet Does the Amazon SQS plugin have some way to delete messages after they’ve been successfully processed? Right now, it looks like if I write a message to a queue and then set up a separate job to read that message, it will just continually read that message. The job I’ve set up just goes :sqs-input -> :identity -> :core.async-out

stephenmhopper20:05:51

Do I need to add another task for deleting the message part way through my job?

lucasbradstreet20:05:01

hmm, might be a bug with the 0.10 port

lucasbradstreet20:05:09

it should automatically clear it once it’s acknowledged

stephenmhopper21:05:16

The logs just contain INFO statements about checkpointing. It’ll have six consecutive “Checkpointed input” statement followed by a single “Checkpointed output” statement.

stephenmhopper21:05:51

@lucasbradstreet Is there something else I should be looking at to help troubleshoot this one?

twashing22:05:27

Does Onyx handle the case where, I have some input from :input-1 that goes to :process-1.

twashing22:05:31

Then from there, we have 1 immediate output to :output-1.

twashing22:05:33

And a continuous stream of data to output-2?

twashing22:05:39

[[:input-1 :process-1]
 [:process-1 :output-1]
 [:process-1 :output-2]]

lucasbradstreet22:05:14

@stephenmhopper looking into it myself. Will be in touch.

lucasbradstreet22:05:42

@twashing output-1 and output-2 will receive equivalent streams there

lucasbradstreet22:05:49

process-1 will message to both of them

twashing22:05:44

So then I need something that consumes process-1 outputs to output-1 … then an asynchronous long-running process that writes to output-2.

lucasbradstreet22:05:21

Oh you’re asking if we have something to do that. Hmm

lucasbradstreet22:05:36

Not sure what the use case is there. That might help

lucasbradstreet22:05:48

You might be able to do it with a trigger and trigger/emit

twashing22:05:50

I have a feeling that that long-running stream simply writes to kafka (in this case), outside of Onyx.

twashing22:05:20

Umm, the use case is that a command kicks off a subscription to a long-running feed.

twashing22:05:05

But I think I can just use Onyx to issue the command and get the command result.

twashing22:05:24

… that long-running feed could just be streamed to Kafka directly.

lucasbradstreet22:05:31

Yeah, not sure how that would best be performed. I’m sure you can do it but it might be a little awkward. A custom trigger would do it for sure.

lucasbradstreet22:05:52

@stephenmhopper found the bug in onyx sqs. Will have a fix for you shortly

stephenmhopper22:05:46

@lucasbradstreet cool, thank you! I just confirmed that if I delete the message manually on output that the problem goes away, but that was definitely a hack. What was the issue?

stephenmhopper22:05:33

It looked like both checkpoint and checkpointed! weren’t doing much of anything. Was that it?

twashing22:05:13

@lucasbradstreet Ok, I’ll dig into it. Thanks.

lucasbradstreet22:05:14

synced should have been deleting the messages for that epoch, but epoch was a volatile, so needed to be derefed

lucasbradstreet22:05:31

technically the call should be in checkpointed!, but onyx isn’t calling it yet.

lucasbradstreet22:05:41

I’ll try to get that happening soon

lucasbradstreet22:05:52

I’ll have a snapshot build for you shortly

lucasbradstreet22:05:03

this plugin wasn’t super well tested since 0.10

lucasbradstreet22:05:10

it’s been used quite a lot with 0.9 though

stephenmhopper22:05:11

Ah, so was the (get @processing epoch) bit just returning nil?

lucasbradstreet22:05:39

the test should really drain the input queue to make sure everything was deleted

lucasbradstreet22:05:51

If you’d like to give me a PR for that it’d be appreciated. I don’t have the time atm

stephenmhopper22:05:09

A PR for the test or for the epoch thing?

stephenmhopper22:05:43

Sure, I’ll see if I can do something with it within the next week

lucasbradstreet22:05:06

basically drain the input queue in here https://github.com/onyx-platform/onyx-amazon-sqs/blob/0.10.x/test/onyx/plugin/sqs_input_test.clj#L85 to make sure it doesn’t have anything on it

lucasbradstreet22:05:12

actually I might as well just do it now. don’t worry about it

stephenmhopper22:05:50

okay, thanks again!