This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2020-11-27
Channels
- # announcements (2)
- # aws (17)
- # babashka (13)
- # beginners (84)
- # calva (46)
- # chlorine-clover (40)
- # cider (19)
- # clojure (27)
- # clojure-australia (1)
- # clojure-europe (84)
- # clojure-nl (2)
- # clojure-uk (49)
- # clojurescript (65)
- # core-async (6)
- # cryogen (1)
- # cursive (11)
- # datomic (13)
- # etaoin (3)
- # jackdaw (5)
- # jobs (2)
- # kaocha (34)
- # minimallist (6)
- # off-topic (17)
- # pathom (2)
- # pedestal (11)
- # re-frame (8)
- # reagent (5)
- # rewrite-clj (19)
- # shadow-cljs (30)
Can you please help me to understand how I can send event with attached avro schema? I do the following:
(def test-schema (slurp (io/resource "resources/test.avsc"))
(def my-producer
(jc/producer
{"bootstrap.servers" "localhost:9092"
"key.serializer" "org.apache.kafka.common.serialization.StringSerializer"
"value.serializer" "io.confluent.kafka.serializers.KafkaAvroSerializer"
"schema.registry.url" ""}))
(def value-serde
(serdes.avro.confluent/serde
""
schema-v1
false))
(with-open [producer' my-producer]
@(jc/produce! producer'
{:topic-name "my_topic"}
"2"
{:firstName "Hello"
:lastName "World"}))
But it doesn't work. I need to create one producer. And send events with specified avro schema to topics
so, my question is, how I can implement this code using jackdaw
private fun createProducer(brokers: String, schemaRegistryUrl: String): Producer<String, GenericRecord> {
val props = Properties()
props["bootstrap.servers"] = brokers
props["key.serializer"] = StringSerializer::class.java
props["value.serializer"] = KafkaAvroSerializer::class.java
props["schema.registry.url"] = schemaRegistryUrl
return KafkaProducer<String, GenericRecord>(props)
}
val schema = Schema.Parser().parse(File("src/main/resources/person.avsc"))
val avroPerson = GenericRecordBuilder(schema).apply {
set("firstName", fakePerson.firstName)
set("lastName", fakePerson.lastName)
set("birthDate", fakePerson.birthDate.time)
}.build()
val futureResult = producer.send(ProducerRecord(personsAvroTopic, avroPerson))
or if you know better way, please let me know
Ok, I've completed it, but using Java interop
(:require
[ :as io]
[jackdaw.client :as jc])
(:import
[org.apache.avro Schema$Parser]
[org.apache.avro.generic GenericData$Record])
(def test-schema
(.parse (Schema$Parser.)
(slurp (io/resource "resources/test.avsc"))))
(def rec (GenericData$Record. test-schema))
(.put rec "firstName" "Hello")
(.put rec "lastName" "World")
(def p
(jc/producer {"bootstrap.servers" "localhost:9092"
:key.serializer "org.apache.kafka.common.serialization.StringSerializer"
:value.serializer "io.confluent.kafka.serializers.KafkaAvroSerializer"
:schema.registry.url ""}))
(with-open [producer' p]
@(jc/produce! producer'
{:topic-name "messages_by_sources"}
"1"
rec))
How I can do it using Jackdaw?