Fork me on GitHub

Hey I got it, thank you very much @gphilipp, @dstephens, and @ddouglass.


As I was using the Debezium connector, it will create topics for all tables in the database. So, I can directly refer to those.


The main reason I got confused was that I was using integrate to setup the whole thing and that kinda got messed up!


Now jackdaw.client/subscribe will just subscribe the given consumer to a specified topic and returns the consumer right? So, how to print the consumed data?

Daniel Stephens11:04:39

when using the consumer patter you usually start a thread that repeatedly polls the consumer, if you want to avoid all the complexities of that, using kafka streams handles a lot of the thread management for you


Oh. Basically, I'm trying to use the consumer inside the graphql streamer. Like just if the consumer has some data then just send a callback function, nothing more. So, any suggestions on how to do this? just if the consumer has data then send a callback...

Daniel Stephens14:04:14

Is this to serve subscriptions? That's my only understanding of graphql streamers, in which case I would do exactly as my previous comments says, and use a thread to poll and consumer and push messages down any open web sockets


Yeah for subscriptions... I wrote a method for polling...

(defmethod ig/init-key ::should-poll? [_ should-poll?]
  (volatile! should-poll?))

(defmethod ig/halt-key! ::should-poll? [_ should-poll?]
  (vreset! should-poll? false))

(defmethod ig/prep-key ::poll-timeout [_ timeout]
  (read-string (or timeout "100")))

(defmethod ig/init-key ::poll-timeout [_ timeout]

(defn start-polling
  (let [consumer (::consumer system)
        poll-timeout (::poll-timeout system)
        should-poll? (::should-poll? system)]
    (timbre/info "Starting message polling" poll-timeout @should-poll?)
    (while (true? @should-poll?)
      (doseq [{topic :topic-name :as record} (jc/poll consumer
        ;; Process message synchronous
        (channel/process-event! {:topic (->kebab-case-keyword topic)
                                 :system system
                                 :data record})))))
Now how do I use this for graphql subscriptions? like should I just call this method...?

Daniel Stephens15:04:09

pretty much, it's a bit involved as there's lots of moving parts so you might have to work out the smaller details. There's basically one big issue with what you are doing now, that if you have multiple subscribers then they share a consumer and so they will get only a share of all the messages. The solution I take to this is to take everything from the consumer and put messages onto another event-bus that you can subscribe multiple consumers to, I use manifold. Then in start polling rather than referencing the consumer you reference the manifold event-bus and subscribe to that. An alternative might be to create a new consumer for each subscription with a new group-id, but I expect this would be slow


Yeah that's what I just thought. I currently have a single consumer and there are 3 topics and 3 streamers so if I just do poll inside the 3 streamers then how can a streamer know when to trigger if any of those 3 topics have changed...?


Each streamer have to get triggered the moment the respective topic it is related to gets updated.

Daniel Stephens16:04:47

when the streamer function gets called, you begin consuming continuously, so it always gets everything that happens whilst the streamer is running.


Yeah so can't we filter it? like lets say 1-5 no's belong to topic A, 5-10 belong to topic B. There's a streamer for A and streamer for B. Now, if we just use the poll method then it shows whatever the topic has changed last right? so now if both he streamers are running and we have a poll method inside then if topic A changes then both the streamers may get triggered an instead of only A... so can't filter it like when topic A gets data change then streamer A should trigger and similarly for B.


I just tried calling the above start-polling method inside the streamer function and now when I run the subscription, I'm getting this:

An error occurred in subscription.
Error: Cannot invoke "java.util.concurrent.Future.get()" because "fut" is null


I understand the issue here, I was running the poll inside the streamer so it kept calling every time I run the subscription...


I guess it'd be better to poll and then put the data into a core.async channel


Have you guys tried Kafka with graphql?

👍 2

Yes, several poc's and the place I'm currently at also kind of combines them, but with a traditional dB in the middle. See or also there is a blog in the works in cooperation with Confluent on exactly this.