Fork me on GitHub

@dcj where would the metadata come from?


you would write it explicitly in Kafka?


I might have a related question, are the ConsumerWrappers accessible from jackdaw? I want to log committed offsets


Interesting question @jgerman. Do you mean that you have some app implemented using the kafka streams DSL, and that you basically want to track the progress of the underlying consumers by logging the committed offsets as they are committed? Not sure how I'd do that. I think again it might be something for which you'd need to bust out the processor api.


yeah, I've only been looking at Streams and Jackdaw for a day or so, but in the monitoring section of the Manning book on streams there's an example of implementing interceptors (in java) to get that sort of information


public class StockTransactionConsumerInterceptor implements
 ConsumerInterceptor<Object, Object> {

    // some details left out for clarity
    private static final Logger LOG =

    public StockTransactionConsumerInterceptor() {
        ("Built StockTransactionConsumerInterceptor");

    public ConsumerRecords<Object, Object>
 (ConsumerRecords<Object, Object> consumerRecords) {
        ("Intercepted ConsumerRecords {}",
                 buildMessage(consumerRecords.iterator()));              1
        return consumerRecords;

    public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
         ("Commit information {}",  map);                        2


if I'm going to jump into the ProcessorAPI with jackdaw I'm off in custom interop land correct?


my intention here is to be able to report consumer lag to datadog, we've had trouble getting jmx metrics out of kafka


Ah yeah interceptors is probably a good tool to use for that purpose. You can also do them in clojure. You just need to make sure they are AOT compiled so that kafka. Hopefully jackdaw neither helps nor hinders in this area. It should just get out of your way


cool, I'll poke around and see if I can figure out how to handle it, thanks!