This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2020-06-16
Channels
- # announcements (8)
- # aws (28)
- # babashka (26)
- # beginners (125)
- # calva (18)
- # chlorine-clover (2)
- # cider (12)
- # cljs-dev (6)
- # cljsrn (4)
- # clojure (134)
- # clojure-europe (31)
- # clojure-italy (2)
- # clojure-nl (14)
- # clojure-uk (83)
- # clojurescript (81)
- # conjure (4)
- # cursive (2)
- # datomic (145)
- # emacs (13)
- # events (3)
- # figwheel-main (14)
- # fulcro (30)
- # graalvm (23)
- # graphql (15)
- # helix (21)
- # jackdaw (20)
- # juxt (1)
- # lambdaisland (4)
- # leiningen (2)
- # malli (12)
- # meander (22)
- # observability (22)
- # off-topic (27)
- # pedestal (3)
- # re-frame (12)
- # reitit (1)
- # releases (2)
- # rewrite-clj (3)
- # shadow-cljs (67)
- # spacemacs (7)
- # sql (1)
- # tools-deps (19)
- # unrepl (2)
- # xtdb (25)
Hi everyone
How can i use LogAndContinue on Exception to make the stream not shutdown if something goes wrong?
Hey @abdullahibra, you would just set the corresponding property in the property map that gets passed to the kafka-streams
constructor.
{...
"default.deserialization.exception.handler" "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler"
...}
you mean the config of kafka-streams:
{"application.id" "hello
"bootstrap.servers" kafka-url
"cache.max.bytes.buffering" "0"
"default.deserialization.exception.handler"
"org.apache.kafka.streams.errors.LogAndContinueExceptionHandler"}
@U065JNAN8 i have done that but still got the state transitioned to SHUTDOWN because of errors
some of trace:
-StreamThread-1] Failed to process stream task 0_0 due to the following error:
org.apache.kafka.streams.processor.internals.StreamThread.run StreamThread.java: 788
org.apache.kafka.streams.processor.internals.StreamThread.runLoop StreamThread.java: 819
org.apache.kafka.streams.processor.internals.StreamThread.runOnce StreamThread.java: 912
org.apache.kafka.streams.processor.internals.TaskManager.process TaskManager.java: 425
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process AssignedStreamsTasks.java: 199
org.apache.kafka.streams.processor.internals.StreamTask.process StreamTask.java: 363
org.apache.kafka.streams.processor.internals.SourceNode.process SourceNode.java: 87
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward ProcessorContextImpl.java: 133
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward ProcessorContextImpl.java: 180
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward ProcessorContextImpl.java: 201
org.apache.kafka.streams.processor.internals.ProcessorNode.process ProcessorNode.java: 117
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process KStreamMap.java: 42
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward ProcessorContextImpl.java: 133
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward ProcessorContextImpl.java: 180
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward ProcessorContextImpl.java: 201
org.apache.kafka.streams.processor.internals.ProcessorNode.process ProcessorNode.java: 117
org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process KStreamPeek.java: 42
jackdaw.streams.lambdas.FnForeachAction/apply lambdas.clj: 30
-baafde36-3cd8-43ae-8edf-0680d911545f] All stream threads have died. The instance will be in error state and should be closed.
-baafde36-3cd8-43ae-8edf-0680d911545f-StreamThread-1] Shutdown complete
@abdullahibra I think the problem is that the error is happening after the record has been deserialized. Looks like there's a problem in the function you're using in the foreach action.
that's it
so if i catch Exception nil around that function, i won't get stuck again
i mean the function inside foeach block
Yes but is that what you actually want? The exception might indicate a problem that needs to be fixed.
yeah but that's not restriction
thanks you 🙂
So...just wanted to announce here that I made a super simple thing for converting EDN into avro generic records independently of any other interfaces like Serde or
KafkaAvroSerializer/Deserializer etc. This is relevant to jackdaw because the avro serde in jackdaw is....problematic shall we say (this is my fault so it's not intended as a criticism of the current maintainers). With this (or something like it since it's only ~10 lines of code), you could just set the serde to be confluent's standard "io.confluent.kafka.streams.serdes.avro.GenericAvroSerde"
and then in your app, use as-edn
and as-avro
as necessary to pack/unpack records. IMO this is simpler and safer
(i.e. less prone to bugs) for new projects than the avro serde bundled
into jackdaw.
https://github.com/cddr/edn-avro