Fork me on GitHub
#xtdb
<
2020-07-03
>
Jacob O'Bryant06:07:39

I'm trying to integrate Crux with http://materialize.io. I think/hope it'll work if I use the kafka connect plugin (with mode=doc and format=json) and materialize's "text or bytes over kafka" source (with ENVELOPE UPSERT). However I'm having trouble with the kafka connect plugin. Given the following, test.sink.txt gets created but it remains empty:

; deps.edn

{:deps
 {juxt/crux-core {:mvn/version "20.05-1.8.4-alpha"}
  juxt/crux-kafka-embedded {:mvn/version "20.05-1.8.4-alpha"}
  juxt/crux-rocksdb {:mvn/version "20.05-1.8.4-alpha"}}}

; src/foo.clj

(ns foo
  (:require
    [crux.api :as crux]
    [crux.kafka.embedded :as ek]))

(defn -main []
  (let [node (crux/start-node
               {:crux.kv/db-dir (str "data/crux-db")
                :crux.http-server/port 3000
                :crux.node/topology '[crux.standalone/topology crux.kv.rocksdb/kv-store]
                :crux.standalone/event-log-dir "data/eventlog"
                :crux.standalone/event-log-kv-store 'crux.kv.rocksdb/kv})]
    (ek/start-embedded-kafka
      #:crux.kafka.embedded{:zookeeper-data-dir "data/zk-data"
                            :kafka-log-dir "data/kafka-log"
                            :kafka-port 9092})
    (loop []
      (Thread/sleep 1000)
      (println "Submitting event")
      (crux/submit-tx node [[:crux.tx/put {:crux.db/id (java.util.UUID/randomUUID)
                                           :timestamp (java.util.Date.)
                                           :text "some event"}]])
      (recur))))

; shell

$ clj -m foo
$ cd kafka_2.12-2.3.0 # new terminal tab
$ ls libs/crux* config/local*
config/local-crux-source.properties  libs/crux-kafka-connect-19.12-1.6.1-alpha-standalone.jar
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 \
                      --replication-factor 1 --partitions 1 --topic connect-test
$ bin/connect-standalone.sh config/connect-standalone.properties \
                            config/local-crux-source.properties \
                            config/connect-file-sink.properties
Is it expected that this would work, or am I missing something?

Jacob O'Bryant06:07:37

actually I just noticed this exception in the output:

[2020-07-03 00:25:34,790] ERROR Failed to start task local-crux-source-0 (org.apache.kafka.connect.runtime.Worker:470)                                                                                                                        
java.lang.ExceptionInInitializerError  
...
Caused by: Syntax error compiling new at (taoensso/nippy/compression.clj:43:15).   
...
Caused by: java.lang.ClassNotFoundException: org.tukaani.xz.XZOutputStream                                                                                                                                                
investigating that now...

refset07:07:05

Ah, the connector on Confluent Hub is stale, have you tried building a fresh version from source? Sadly the process for updating the version on the hub is very manual so we've not kept it up to date (sorry!). I remember we solved some issue with xz already so perhaps it will "just work" now with a newer version

refset07:07:55

Hmm, actually looking at the date for the last xz fix it seems this might be a current issue: https://github.com/juxt/crux/commit/16de9f879d394044365992edafca196e6172cf6d I'll see if I can repro

Jacob O'Bryant07:07:02

thanks for taking a look. I did try building from source, but lein package gave me clojure.lang.Compiler$CompilerException: clojure.lang.LispReader$ReaderException: java.lang.RuntimeException: No reader function for tag Inf, compiling:(mranderson/inlined/parallel/v0v10/parallel/core.clj:323:31)

refset21:07:11

Hey again, sorry for only just looking into this. lein package seemed to work fine for me just now, what does your lein -v show? Mine is Leiningen 2.9.3 on Java 11.0.6 OpenJDK 64-Bit Server VM

Jacob O'Bryant22:07:26

ah, I was on an old version of lein (and java 8 ). I have Leiningen 2.9.3 on Java 11.0.7 OpenJDK 64-Bit Server VM now and lein package works fine. Thanks, I'll let you know if I run into any other issues with the connector.

👍 3
Jacob O'Bryant07:07:28

@U899JBRPF I ran the example above with the newly-built package:

$ ls libs/crux*
libs/crux-kafka-connect-20.06-1.9.1-beta-SNAPSHOT-standalone.jar
But I get the same exception. Want to see if you can reproduce this? 🙂