Fork me on GitHub
#jackdaw
<
2019-06-03
>
dangercoder14:06:24

Are there any guides on how to create a consumer? I currently get Missing required configuration "key.deserializer" which has no default value.

dharrigan14:06:20

Here's a snippet:

dharrigan14:06:05

(defn -main
  [& _]
  (echo-environment)
  (with-open [consumer (-> (jackdaw-client/consumer consumer-config)
                           (jackdaw-client/subscribe [my-topic]))]
    (doseq [message (jackdaw-log/log consumer (-> env :topic :poll-interval-ms))]
      (process/foreach message))))

dharrigan14:06:15

(def consumer-config
  (merge (-> env :broker)
         {"key.deserializer" (.getName StringDeserializer)
          "value.deserializer" (.getName StringDeserializer)}))

dharrigan14:06:55

(:require [my-foo-process :as process]
            [jackdaw.client.log :as jackdaw-log]
            [jackdaw.client :as jackdaw-client]
            [jackdaw.serdes.json :as json-serde]
            [config.core :refer [env]]
            [clojure.pprint :refer [pprint]])
  (:import [org.apache.kafka.common.serialization Serdes StringDeserializer])
  (:gen-class))

dharrigan14:06:36

I use config.core to store in a config.edn values, such as :topic, :poll-interval-ms and :broker

dharrigan14:06:56

hope that helps to get you on the path

dangercoder14:06:07

Ah yeah, thanks for the example. ✌️

dharrigan14:06:22

it's s bit, bitty, since it's copy&pasta from a program in use, so can't show company stuff

dharrigan14:06:28

but it's a generic enough setup

dangercoder14:06:47

Still not 100% about when to use value-serdes and key-serdes in the client.consumer api though 😄

dangercoder14:06:17

I know the java api for kafka, just something I dont want to use 🙂

dangercoder14:06:17

nvm I figured it out now

dangercoder14:06:56

Do you process your messages "async" with core.async @dharrigan?

dharrigan14:06:47

no need to complicate things for now, works fine 😉