Hey @dstephens, I thought it'd be better to poll the messages and then put those messages in a core.async channel and from the streamer I can read that channel and then send a callback... So, I tried polling them inside the mutation like this,

(while true
  (let [records (jc/poll consumer 1000)]
      (doseq [{:keys [value]} records]
        (println value))
      (.commitSync consumer))) 


But I'm getting this:

class clojure.lang.PersistentArrayMap cannot be cast to class org.apache.kafka.clients.consumer.Consumer (clojure.lang.PersistentArrayMap and org.apache.kafka.clients.consumer.Consumer are in unnamed module of loader 'app'
What does this mean?

Daniel Stephens12:04:38

Pretty standard error in clojure, it means you are calling a function that expects a consumer on a map and it can't be cast. It's usually pretty simple to debug these in a repl as you can divide and conquer to work out where the error is coming from. In this case since it's a method that's expecting a consumer, as a guess it's quite likely you are calling poll or commitSync on a map by accident, I'd probably start with checking the value of consumer in your snippet


I printed out consumer and it returned huge stuff