Fork me on GitHub
#jackdaw
<
2020-06-16
>
abdullahibra10:06:44

How can i use LogAndContinue on Exception to make the stream not shutdown if something goes wrong?

cddr11:06:50

Hey @abdullahibra, you would just set the corresponding property in the property map that gets passed to the kafka-streams constructor.

cddr11:06:19

{...
 "default.deserialization.exception.handler" "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler"
 ...}

abdullahibra11:06:45

you mean the config of kafka-streams:

{"application.id" "hello
   "bootstrap.servers" kafka-url
   "cache.max.bytes.buffering" "0"
   "default.deserialization.exception.handler" 
   "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler"}

cddr11:06:45

yep exactly

👍 3
abdullahibra11:06:09

@U065JNAN8 i have done that but still got the state transitioned to SHUTDOWN because of errors

abdullahibra11:06:08

some of trace:

abdullahibra11:06:12

-StreamThread-1] Failed to process stream task 0_0 due to the following error:
                      org.apache.kafka.streams.processor.internals.StreamThread.run          StreamThread.java:  788
                  org.apache.kafka.streams.processor.internals.StreamThread.runLoop          StreamThread.java:  819
                  org.apache.kafka.streams.processor.internals.StreamThread.runOnce          StreamThread.java:  912
                   org.apache.kafka.streams.processor.internals.TaskManager.process           TaskManager.java:  425
          org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process  AssignedStreamsTasks.java:  199
                    org.apache.kafka.streams.processor.internals.StreamTask.process            StreamTask.java:  363
                    org.apache.kafka.streams.processor.internals.SourceNode.process            SourceNode.java:   87
          org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward  ProcessorContextImpl.java:  133
          org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward  ProcessorContextImpl.java:  180
          org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward  ProcessorContextImpl.java:  201
                 org.apache.kafka.streams.processor.internals.ProcessorNode.process         ProcessorNode.java:  117
  org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process            KStreamMap.java:   42
          org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward  ProcessorContextImpl.java:  133
          org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward  ProcessorContextImpl.java:  180
          org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward  ProcessorContextImpl.java:  201
                 org.apache.kafka.streams.processor.internals.ProcessorNode.process         ProcessorNode.java:  117
org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process           KStreamPeek.java:   42
                                      jackdaw.streams.lambdas.FnForeachAction/apply                lambdas.clj:   30

abdullahibra11:06:21

-baafde36-3cd8-43ae-8edf-0680d911545f] All stream threads have died. The instance will be in error state and should be closed.
-baafde36-3cd8-43ae-8edf-0680d911545f-StreamThread-1] Shutdown complete

cddr12:06:57

@abdullahibra I think the problem is that the error is happening after the record has been deserialized. Looks like there's a problem in the function you're using in the foreach action.

abdullahibra12:06:44

so if i catch Exception nil around that function, i won't get stuck again

abdullahibra12:06:57

i mean the function inside foeach block

cddr12:06:31

Yes but is that what you actually want? The exception might indicate a problem that needs to be fixed.

abdullahibra12:06:56

yeah but that's not restriction

abdullahibra12:06:00

thanks you 🙂

cddr11:06:53

So...just wanted to announce here that I made a super simple thing for converting EDN into avro generic records independently of any other interfaces like Serde or KafkaAvroSerializer/Deserializer etc. This is relevant to jackdaw because the avro serde in jackdaw is....problematic shall we say (this is my fault so it's not intended as a criticism of the current maintainers). With this (or something like it since it's only ~10 lines of code), you could just set the serde to be confluent's standard "io.confluent.kafka.streams.serdes.avro.GenericAvroSerde" and then in your app, use as-edn and as-avro as necessary to pack/unpack records. IMO this is simpler and safer (i.e. less prone to bugs) for new projects than the avro serde bundled into jackdaw. https://github.com/cddr/edn-avro

💯 3