jackdaw

Alastair Hole 2023-07-13T13:25:41.870419Z

I wanted to exit a Kafka Streams application on error e.g. when the producer fails to find a valid schema if auto schema registration is off (so that k8s can fail noisily/restart the pod). I assumed the app would exit if the streams app fails but this appears not to be the case, it just closes the client and sits there. To achieve this I used a state listener:

(:require [clojure.tools.logging :refer [error]]
          [jackdaw.streams :as js])
(:import  [org.apache.kafka.streams KafkaStreams$StateListener KafkaStreams$State])

(def state-listener
  (reify KafkaStreams$StateListener
    (onChange [_this new-state _old-state]
      (when (= new-state KafkaStreams$State/ERROR)
        (error "Exiting")
        (System/exit 1)))))

....

(.setStateListener app state-listener)
(js/start app)
I share this here partly as it may be useful as a reference for others, but I also welcome any feedback in case this is a terrible idea and there is a better way to achieve the same. I did look at doing it via a StreamsUncaughtExceptionHandler but it seems if I rely on the default exception handler then it will cleanly close the client with a StreamThreadExceptionResponse of SHUTDOWN_CLIENT and then transition to error state, where the app can then be exited cleanly.