Fork me on GitHub
#jackdaw
<
2021-05-11
>
finchharold09:05:42

Any one tried poll method with integrant?

gphilipp09:05:03

Integrant doesn’t really care about what you do in your components when they’re started, it just care about how to start and stop them.

finchharold10:05:21

yeah, but starting the polling with integrant, is stoping the rest of the process.

finchharold10:05:09

Like as I start the app, all I can see is the poll method running and I can apply the graphql mutations and subscriptions as it says connection refused(not running) and REPL is hung!

Darin Douglass10:05:28

Polling is a blocking operation. Are you spinning up and polling the consumer in the main thread? https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

finchharold10:05:32

I mean, I'm running it with integrant, so yeah maybe it is running in the main thread!

finchharold10:05:24

this is the method:

(defmethod ig/init-key ::poll [_ {consumer :consumer channel :channel}]
  (timbre/info "Starting polling")
  (with-open [consumer (jc/seek-to-end-eager consumer)]
    (while true
      (doseq [{:keys [value]} (jackclient/poll consumer 100)]
        (>!! (:channel channel) {:msg-type :update :data {:msg value}}))))
  (.commitSync consumer))

Darin Douglass10:05:14

Ya that'll block the main thread. I’m fairly certain integrant won’t wrap states in threads

finchharold10:05:43

I'm guessing the other issue is it doesn’t return anything...? like the init-key method needs something to be returning right? so that it can be the key?

finchharold10:05:10

It just polls the value and puts it in a channel...

Darin Douglass11:05:04

The value can (and likely should) be a thread so you can shut it down on halt

Darin Douglass11:05:41

Or a go-loop or whatever concurrency primitive you choose to use

finchharold11:05:32

I've tried it two ways:

(defmethod ig/init-key ::poll [_ {consumer :consumer channel :channel}]
  (timbre/info "Starting polling")
  (with-open [consumer (jc/seek-to-end-eager consumer)]
    (future
      (while true
        (doseq [{:keys [value]} (jc/poll consumer 100)]
          (>!! (:channel channel) {:msg-type :update :data {:msg value}}))))
   ); should future end here?
  (.commitSync consumer)))
This way the polling is running in the background and isn't blocking the main thread i.e, the REPL is function but it isn't actually putting anything in the channel, if I add a print statement there it isn't even showing up. Other one is I replaced future with thread, and got an exception saying Kafka is not safe for multi-threaded access.

Darin Douglass12:05:40

That is correct, kafka consumers are not thread safe, so you’d want to move the consumer creation into the future

Darin Douglass12:05:58

Which means NOT getting the consumer from another init-key

finchharold12:05:43

So, how do I get the consumer then?

finchharold12:05:22

I use a prep-key method for the consumer config, then pass the config into an init-key method, where I get the consumer to subscribe to the topics and ultimately return it.

gklijs12:05:25

What about keeping the config in the init-key, and use that to create the consumer?

finchharold12:05:34

No config comes from prep-key

gphilipp12:05:04

Your init-key code should call something like this (start-consumer!) fn:

(defn- poll!
  [consumer producer running]
  (while @running
    (let [records (jd/poll consumer (Duration/ofMillis 1000))]
      (doseq [{:keys [value]} records]
        ;; do stuff)
      (.commitSync consumer))))


(defn- start-consumer! [consumer producer topic-config running]
  (let [consumer-thread (Thread. #(poll! consumer producer running))]
    (jd/subscribe consumer [topic-config])
    (.start consumer-thread)
    consumer-thread))

gphilipp12:05:08

So integrant with keep this Thread object under the system key you’ve associated it with.

gphilipp12:05:09

In case you want to stop the system, you need to get hold of that object via the halt-key! lifecycle fn and stop the thread.

finchharold12:05:17

consumer init-key method?

finchharold12:05:38

How can the consumer init-key method call start-consumer! method? I mean consumer init-key method is the one that is returning the consumer...

gphilipp12:05:07

Your init-key function will call the start-consumer! fn which will start the background thread and return the Thread object. Integrant will just keep that in his system map, ready for you to look it up whenever you want if you want to stop it (or do anything you want with that Thread object).

finchharold12:05:57

Yeah but start-consumer! needs the consumer as an argument...? but init-key will return the consumer...

gphilipp13:05:21

Yeah but they are different, there’s the one that sits at the system (integrant) level, it’s what we call a system component and you should name differently, eg GraphqlExposer and under the hood it uses one (or maybe many) kafka consumer.

gphilipp13:05:13

You can read https://github.com/stuartsierra/component to understand a bit more on those components

gphilipp13:05:43

Integrant is just another take at handling components lifecycle.