Fork me on GitHub
#jackdaw
<
2020-11-27
>
y.khmelevskii20:11:39

Can you please help me to understand how I can send event with attached avro schema? I do the following:

(def test-schema (slurp (io/resource "resources/test.avsc"))

(def my-producer
    (jc/producer
     {"bootstrap.servers" "localhost:9092"
      "key.serializer"      "org.apache.kafka.common.serialization.StringSerializer"
      "value.serializer"    "io.confluent.kafka.serializers.KafkaAvroSerializer"
      "schema.registry.url" ""}))

  (def value-serde
    (serdes.avro.confluent/serde
     ""
     schema-v1
     false))

(with-open [producer' my-producer]
    @(jc/produce! producer'
                  {:topic-name "my_topic"}
                  "2"
                  {:firstName "Hello"
                      :lastName  "World"}))

y.khmelevskii20:11:43

But it doesn't work. I need to create one producer. And send events with specified avro schema to topics

y.khmelevskii22:11:20

so, my question is, how I can implement this code using jackdaw

private fun createProducer(brokers: String, schemaRegistryUrl: String): Producer<String, GenericRecord> {
    val props = Properties()
    props["bootstrap.servers"] = brokers
    props["key.serializer"] = StringSerializer::class.java
    props["value.serializer"] = KafkaAvroSerializer::class.java
    props["schema.registry.url"] = schemaRegistryUrl
    return KafkaProducer<String, GenericRecord>(props)
}
val schema = Schema.Parser().parse(File("src/main/resources/person.avsc"))
val avroPerson = GenericRecordBuilder(schema).apply {
    set("firstName", fakePerson.firstName)
    set("lastName", fakePerson.lastName)
    set("birthDate", fakePerson.birthDate.time)
}.build()
val futureResult = producer.send(ProducerRecord(personsAvroTopic, avroPerson))

y.khmelevskii22:11:06

or if you know better way, please let me know

y.khmelevskii23:11:45

Ok, I've completed it, but using Java interop

(:require
 [ :as io]
 [jackdaw.client :as jc])
(:import
 [org.apache.avro Schema$Parser]
 [org.apache.avro.generic GenericData$Record])

(def test-schema
  (.parse (Schema$Parser.)
          (slurp (io/resource "resources/test.avsc"))))

(def rec (GenericData$Record. test-schema))
(.put rec "firstName" "Hello")
(.put rec "lastName" "World")

(def p
  (jc/producer {"bootstrap.servers" "localhost:9092"
                :key.serializer      "org.apache.kafka.common.serialization.StringSerializer"
                :value.serializer    "io.confluent.kafka.serializers.KafkaAvroSerializer"
                :schema.registry.url ""}))

(with-open [producer' p]
  @(jc/produce! producer'
                {:topic-name "messages_by_sources"}
                "1"
                rec))
How I can do it using Jackdaw?