Fork me on GitHub
#jackdaw
<
2021-06-04
>
Laura Wyglendacz09:06:54

hello 👋

Laura Wyglendacz09:06:59

I’m trying to use jackdaw to implement a window-by-time, count, suppress, and output program almost identical to this java https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#window-final-results :

KGroupedStream<UserId, Event> grouped = ...;
grouped
    .windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(Duration.ofMinutes(10)))
    .count()
    .suppress(Suppressed.untilWindowCloses(unbounded()))
    .filter((windowedUserId, count) -> count < 3)
    .toStream()
    .foreach((windowedUserId, count) -> sendAlert(windowedUserId.window(), windowedUserId.key(), count));

Laura Wyglendacz09:06:12

My code using jackdaw function wrappers around the same kstreams methods:

(defn build-topology!
         [config builder]
         (-> (js/kstream builder (config/topic (:INPUT_TOPIC config)))
          (js/filter moder/moderated?)
          (js/group-by (fn [[_ v]] (:goalId v)))
          (js/window-by-time (.grace (TimeWindows/of (Duration/ofMinutes 5)) (Duration/ofMillis 1)))
          (js/count)
          (js/suppress {})
          (js/to-kstream)
          (js/print!) ...
works fine, with the exception of the suppress function, which, when added in throws a
java.lang.RuntimeException: No reader function for tag object
	at clojure.lang.EdnReader$TaggedReader.readTagged(EdnReader.java:801) ...

Sam H12:06:47

is this while running the code in a repl? Could be seeing an issue with the underlying Clojure.java-time lib: https://github.com/dm3/clojure.java-time/issues/15

Laura Wyglendacz16:06:58

No, not in a repl, when I run with lein

Sam H09:06:38

Hey, not sure if you ever solved this but I ran into something similar. It was due to going from a jackdaw avro serde to and edn serde but the avro had a field with a type of bytes (decimal logical type). This caused the serializtion to edn to fail due to it having a BufferBytea class for that field value

Laura Wyglendacz09:06:01

Has anyone come across this before? It has me totally stumped, as I’m not sure what suppress is trying to serde or why it is going wrong