Fork me on GitHub
#jackdaw
<
2020-04-30
>
abdullahibra14:04:18

i have tried the based example of simple topology but i didn't get any results printed

(ns hello.text
  (:require [jackdaw.streams :as js]
            [jackdaw.client :as jc]
            [jackdaw.client.log :as jcl]
            [jackdaw.admin :as ja]
            [jackdaw.serdes :refer [string-serde]]
            [jackdaw.serdes.json :refer [serde]]
            [jackdaw.serdes.resolver :as resolver]))

(def kafka-config
  {"application.id" "hello
   "bootstrap.servers" kafka-url
   "default.key.serde" "jackdaw.serdes.EdnSerde"
   "default.value.serde" "jackdaw.serdes.EdnSerde"
   "cache.max.bytes.buffering" "0"})

;; Serdes tell Kafka how to serialize/deserialize messages
;; We'll just keep them as JSON
(def serdes
  {:key-serde (string-serde)
   :value-serde (serde)})

;; Each topic needs a config. The important part to note is the :topic-name key.
(def source-topic
  (merge {:topic-name "source"
          :partition-count 1
          :replication-factor 1
          :topic-config {}}
         serdes))

(def dest-topic
  (merge {:topic-name "dest"
          :partition-count 1
          :replication-factor 1
          :topic-config {}}
         serdes))

(defn simple-topology [builder]
  (-> (js/kstream builder source-topic)
      (js/for-each! (fn [[key msg]] 
                      (prn msg)
                      ))
      ))

(defn view-messages [topic]
  "View the messages on the given topic"
  (with-open [consumer (jc/subscribed-consumer 
                        (assoc kafka-config "group.id" (str (java.util.UUID/randomUUID)))
                        [topic])]
    (jc/seek-to-beginning-eager consumer)
    (->> (jcl/log-until-inactivity consumer 100)
         (map :value)
         doall)))

(defn start! []
  "Starts the simple topology"
  (let [builder (js/streams-builder)]
    (simple-topology builder)
    (doto (js/kafka-streams builder kafka-config)
      (js/start))))

(defn stop! [kafka-streams-app]
  "Stops the given KafkaStreams application"
  (js/close kafka-streams-app))

(def hello (start!))

abdullahibra14:04:06

i'm executing that in cider emacs

abdullahibra14:04:21

but i don't get any message printed

abdullahibra14:04:26

is there something i did wrong ?

abdullahibra14:04:00

if i run that separately

(def builder (js/streams-builder))
(simple-topology builder) ;; that gives nil, is it impure function which start something in the background ?

noisesmith14:04:23

topologies start threads

noisesmith14:04:40

I don't see anything in your example that writes messages the topology would consume?