Fork me on GitHub
#jackdaw
<
2021-05-02
>
finchharold02:05:35

I set this to seek-to-beginning-eager, and still it's returning count=0.

dcj02:05:17

and you know there are messages in that topic?

finchharold02:05:19

Yeah, just now I ran this command, bin/kafka-console-consumer.sh --topic app.public.chatrooms --from-beginning --bootstrap-server http://localhost:9092/

finchharold02:05:48

It should all the messages from beginning and even showed whenever I update the chatroom data...

dcj02:05:32

And you set consumer-config and topic-serde ?

finchharold02:05:42

So, the consumer is already set and is subscribed to the topic...

finchharold02:05:21

@dcj Now it is returning this poll returned, count: 19 poll returned, count: 0 poll returned, count: 0 poll returned, count: 0 poll returned, count: 0

dcj02:05:33

19 is good

dcj02:05:48

you got 19

finchharold02:05:49

Yeah so now how can I get just the latest message? like if I changed something now, then it should return only that... is that possible?

dcj02:05:18

seek to end eager

dcj02:05:43

then publish more msgs to topic

finchharold02:05:18

Here, I'm running the prt-msgs-now function and it's returning only the count....?

finchharold02:05:37

It should return the message too right, along with the count?

dcj02:05:58

the poll msg is in 1 thread what r u doing when you read the msgs out of the channel?

dcj02:05:35

in the dosync u should print value

finchharold02:05:02

Yeah, I mean I did it like this

(defn prt-msgs-now
  [{consumer :com.app.graphql.kafka/consumer}]
  (let [msgs-chan   (chan)
        consumer-thread (thread (consume-adsb-msgs msgs-chan consumer))]
    (while true
      (let [adsb-messages (<!! msgs-chan)]
        (doseq [{:keys [key value partition timestamp offset] :as message} adsb-messages]
          (println value))))))

finchharold02:05:22

It is showing the message, thank you so much @dcj.

finchharold02:05:27

Just one query though

finchharold02:05:53

I tried it this way

(defn poll-kafka
  [{consumer :com.app.graphql.kafka/consumer}]
  (doseq [{:keys [value]} (jc/poll consumer 100)]
    (prn "Got record from Kafka" value)
    )
  (.commitSync consumer))

finchharold02:05:00

Why do you think this didn't work?

finchharold02:05:28

Yeah got it, because I missed the seek?

finchharold02:05:06

Thank you again @dcj

gklijs06:05:55

I don't really get why you would want to send the messages to a channel through. It's hard to get either at least once or are most once guarantees that way imho.

finchharold12:05:45

I'm using it for graphql streamer

finchharold12:05:23

Actually I'd like to know is there a way to let the streamer know that there's something new in the kafka topic?

finchharold12:05:43

Can I do it like this...?

(with-open [consumer (jc/seek-to-end-eager consumer)]
      (while true
        (doseq [{:keys [value]} (jc/poll consumer 100)]
          (send any method here???))))
    (.commitSync consumer)

finchharold16:05:18

Can't I pole multiple times? Like run the poll-methd multiple times?

gklijs18:05:27

If you never commit, but not very efficient. I just check every message from Kafka if it should be send to one of the active subscriptions. https://github.com/gklijs/bkes-demo/blob/7d86b9b2ef2b014c0ef85b045de8559013d9e9ee/graphql-endpoint/src/nl/openweb/graphql_endpoint/transaction_service.clj#L42