Fork me on GitHub

Hey, I've removed all the docker containers and set the kafka and zookeeper everything manually.


I can read the events now

bin/ --topic --from-beginning --bootstrap-server localhost:9092
this is returning the updates whenever I change a chatroom title


Now when I try to poll the topic like this:

(defn poll-kafka
  (doseq [{:keys [value]} (jc/poll consumer 100)]
    (prn "Got record from Kafka" value)
  (.commitSync consumer))
It returned this:
INFO [org.apache.kafka.clients.Metadata:365] - Cluster ID: AIfhN5dkSsCmMx7QOpcIQQ
INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator:675] - [Consumer clientId=consumer-1, groupId=app] Discovered group coordinator (id: 2147483647 rack: null)
INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:459] - [Consumer clientId=consumer-1, groupId=app] Revoking previously assigned partitions []
INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491] - [Consumer clientId=consumer-1, groupId=app] (Re-)joining group
INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491] - [Consumer clientId=consumer-1, groupId=app] (Re-)joining group
INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator:455] - [Consumer clientId=consumer-1, groupId=app] Successfully joined group with generation 8
INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:290] - [Consumer clientId=consumer-1, groupId=app] Setting newly assigned partitions:


Shouldn't it be showing the messages? instead of nil? I mean there aren't any errors though.


Not 100% sure, since I never used jackdaw. But the first or first few times calling poll usually end up empty, since it needs to fetch meta data + connect to the leaders.


You might want to try seeking to the beginning of the topic for your initial testing. Here is some rough code I've been using to read a topic:

(defn consume-adsb-msgs
  (with-open [consumer (-> (jc/consumer consumer-config topic-serde)
                              (jc/subscribe [topic-foo])
    (while true
      (let [messages (jc/poll consumer 1000)]
        (println "poll returned, count:" (count messages))
        (when (and messages (not (empty? messages)))
          (>!! msgs-chan messages)
          (.commitSync consumer))))))


On the other end of that channel, I doseq on the messages, and process each message indivdually


something like this:

(defn prt-msgs-now
  (let [msgs-chan   (chan)
        consumer-thread (thread (consume-adsb-msgs msgs-chan))]
    (while true
      (let [adsb-messages (<!! msgs-chan)]
        (doseq [{:keys [key value partition timestamp offset] :as message} adsb-messages]