I wanted to exit a Kafka Streams application on error e.g. when the producer fails to find a valid schema if auto schema registration is off (so that k8s can fail noisily/restart the pod). I assumed the app would exit if the streams app fails but this appears not to be the case, it just closes the client and sits there. To achieve this I used a state listener:
(:require [clojure.tools.logging :refer [error]]
[jackdaw.streams :as js])
(:import [org.apache.kafka.streams KafkaStreams$StateListener KafkaStreams$State])
(def state-listener
(reify KafkaStreams$StateListener
(onChange [_this new-state _old-state]
(when (= new-state KafkaStreams$State/ERROR)
(error "Exiting")
(System/exit 1)))))
....
(.setStateListener app state-listener)
(js/start app)
I share this here partly as it may be useful as a reference for others, but I also welcome any feedback in case this is a terrible idea and there is a better way to achieve the same. I did look at doing it via a StreamsUncaughtExceptionHandler but it seems if I rely on the default exception handler then it will cleanly close the client with a StreamThreadExceptionResponse of SHUTDOWN_CLIENT and then transition to error state, where the app can then be exited cleanly.