Fork me on GitHub
#jackdaw
<
2021-04-29
>
finchharold16:04:51

Hey, I've removed all the docker containers and set the kafka and zookeeper everything manually.

finchharold16:04:20

I can read the events now

bin/kafka-console-consumer.sh --topic public.app.chatrooms --from-beginning --bootstrap-server localhost:9092
this is returning the updates whenever I change a chatroom title

finchharold16:04:16

Now when I try to poll the topic like this:

(defn poll-kafka
  [{consumer :com.app.graphql.kafka/consumer}]
  (doseq [{:keys [value]} (jc/poll consumer 100)]
    (prn "Got record from Kafka" value)
    )
  (.commitSync consumer))
It returned this:
INFO [org.apache.kafka.clients.Metadata:365] - Cluster ID: AIfhN5dkSsCmMx7QOpcIQQ
INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator:675] - [Consumer clientId=consumer-1, groupId=app] Discovered group coordinator 192.168.0.100:9092 (id: 2147483647 rack: null)
INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:459] - [Consumer clientId=consumer-1, groupId=app] Revoking previously assigned partitions []
INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491] - [Consumer clientId=consumer-1, groupId=app] (Re-)joining group
INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491] - [Consumer clientId=consumer-1, groupId=app] (Re-)joining group
INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator:455] - [Consumer clientId=consumer-1, groupId=app] Successfully joined group with generation 8
INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:290] - [Consumer clientId=consumer-1, groupId=app] Setting newly assigned partitions: public.app.chatrooms-0
nil

finchharold16:04:42

Shouldn't it be showing the messages? instead of nil? I mean there aren't any errors though.

gklijs16:04:32

Not 100% sure, since I never used jackdaw. But the first or first few times calling poll usually end up empty, since it needs to fetch meta data + connect to the leaders.

dcj17:04:21

You might want to try seeking to the beginning of the topic for your initial testing. Here is some rough code I've been using to read a topic:

(defn consume-adsb-msgs
  [msgs-chan]
  (with-open [consumer (-> (jc/consumer consumer-config topic-serde)
                              (jc/subscribe [topic-foo])
                              jc/seek-to-end-eager)]
    (while true
      (let [messages (jc/poll consumer 1000)]
        (println "poll returned, count:" (count messages))
        (when (and messages (not (empty? messages)))
          (>!! msgs-chan messages)
          (.commitSync consumer))))))

dcj18:04:02

On the other end of that channel, I doseq on the messages, and process each message indivdually

dcj18:04:26

something like this:

(defn prt-msgs-now
  []
  (let [msgs-chan   (chan)
        consumer-thread (thread (consume-adsb-msgs msgs-chan))]
    (while true
      (let [adsb-messages (<!! msgs-chan)]
        (doseq [{:keys [key value partition timestamp offset] :as message} adsb-messages]