Fork me on GitHub
#jackdaw
<
2021-08-25
>
Leigh Peacock11:08:37

Hi all! So I'm very new to Clojure and also Jackdaw, but decided to have a play and reimplement one of the Scala apps at work using Clojure, but following the Serdes example I'm struggling on the very last step trying to send a message to the output topic. I've hit the following issue which I've noticed in the issues on Github too, but I can't find a way past it:

java.lang.IllegalArgumentException: No implementation of method: :clj->avro of protocol: #'jackdaw.serdes.avro/SchemaCoercion found for class: nil
Basically the app reads in messages with a string key and an avro serde value, does some transformations and then spits out a long key and an avro serde value, but can't work out how to provide the kafka common serialization long serde for the key serializer, I'm probably down the rabbit hole, but tried this to mimic the string-serde provided by Jackdaw:
(ns deposit-normaliser-clojure.serdes
  (:gen-class)
  (:import org.apache.kafka.common.serialization.Serdes))

(defn long-serde
  []
  (Serdes/Long))
and then reference that in the config for the serde-resolver:
(def +topic-metadata+
  ... Omitted...
   :output
   {:topic-name "promos-engine-outbound-avro"
    :partition-count 1
    :replication-factor 1
    :key-serde {:serde-keyword :deposit-normaliser-clojure.serdes/long-serde}
    :value-serde {:serde-keyword :jackdaw.serdes.avro.confluent/serde
                  :schema-filename "outbound.events.avsc"
                  :key? false}}})
Sorry for the long message, and it's highly possible I'm missing something completely obvious, any help would be appreciated 🙂

✅ 3
Sam H15:08:26

I think you can also just replace

:key-serde {:serde-keyword :deposit-normaliser-clojure.serdes/long-serde}
with
:key-serde deposit-normaliser-clojure.serdes/long-serde

Sam H15:08:57

But do you know where in your topology this error is happening? when it's trying to produce the Avro before sending to the output topic?

Sam H15:08:12

tho the exception indicates there might be a problem with the schema you have

Leigh Peacock15:08:56

Hi there, thanks for the pointers, the more I look at it the more I think it's something related to the schema too, it's the last step it fails on, when it goes to forward the message to the outbound topic:

stream-client [deposit-normaliser-9a471287-99db-4157-aa1f-907ec6168d0f] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.  - 2021-08-25 12:02:32,601  48472 [deposit-normaliser-9a471287-99db-4157-aa1f-907ec6168d0f-StreamThread-1] ERROR o.a.k.s.KafkaStreams
org.apache.kafka.streams.errors.StreamsException: Error encountered sending record to topic promos-engine-outbound-avro for task 0_8 due to:
java.lang.IllegalArgumentException: No implementation of method: :clj->avro of protocol: #'jackdaw.serdes.avro/SchemaCoercion found for class: nil
... Omitted...
Caused by: java.lang.IllegalArgumentException: No implementation of method: :clj->avro of protocol: #'jackdaw.serdes.avro/SchemaCoercion found for class: nil
	at clojure.core$_cache_protocol_fn.invokeStatic(core_deftype.clj:583)
	at clojure.core$_cache_protocol_fn.invoke(core_deftype.clj:575)
	at jackdaw.serdes.avro$eval2520$fn__2547$G__2511__2556.invoke(avro.clj:128)
	at jackdaw.serdes.avro.RecordType.clj__GT_avro(avro.clj:444)
	at jackdaw.serdes.avro.RecordType.clj__GT_avro(avro.clj:444)
	at jackdaw.serdes.avro$serializer$fn__2973.invoke(avro.clj:524)
	at jackdaw.serdes.fn_impl.FnSerializer.serialize(fn_impl.clj:25)
	at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:157)

Sam H15:08:29

weird. Maybe try and do a minimal Avro schema with 1 field and then just see if you can output that

Leigh Peacock15:08:36

Waiiiiiiiit... Jackdaw mangles the names of the keywords doesn't it? So underscores become hyphens?

Sam H15:08:43

ah yeah lol

Sam H15:08:53

there's a mangle/unmangle functions

Leigh Peacock15:08:59

It's been staring me in the face this whole time 😆

Leigh Peacock15:08:16

Lemme try that

Sam H16:08:20

https://github.com/FundingCircle/jackdaw/blob/b4f67106cc12a78d9dcc64b6cba668ab6a804d04/src/jackdaw/serdes/avro.clj#L94-L98

(defn- ^String mangle [^String n]
  (str/replace n #"-" "_"))

(defn- ^String unmangle [^String n]
  (str/replace n #"_" "-"))

Leigh Peacock16:08:42

Does it only do that for deserialisation or does this happen on serialisation too?

Leigh Peacock16:08:59

I love you 😆

Leigh Peacock16:08:50

I've spent days trying to work that out haha, sometimes just need a pointer in the right direction, sorry for being a muppet 😆

Sam H16:08:13

no worries, I have found jackdaw/clojure errors and exceptions can sometimes be cryptic

yes 3
Sam H16:08:28

> Does it only do that for deserialisation or does this happen on serialisation too? it happens on both directions

Sam H16:08:55

someone added a PR to make mangling optional over 2 years ago 😅 https://github.com/FundingCircle/jackdaw/pull/126

Leigh Peacock16:08:30

Yeah, worst thing is I saw this and it didn't register it'd happen on serialisation too, feel like a right donkey haha

Leigh Peacock16:08:59

They mention in the comments just need to hyphenate keywords facepalm