Fork me on GitHub
#xtdb
<
2024-04-19
>
Panel06:04:34

Is it currently possible to stream rows out of xtdb2 ?

jarohen08:04:39

if you don't mind relying on (very) internal APIs (with all the usual caveats), you could use this until we're in a place to commit to a streaming API:

;; use with `into`, `transduce`, `reduce`, `run!`, etc

(defn plan-q
  ([node query] (plan-q node query {}))
  ([node query opts]
   (let [opts (-> (into {:default-all-valid-time? false} opts)
                  (xtdb.time/after-latest-submitted-tx node))]

     (reify clojure.lang.IReduceInit
       (reduce [_ f init]
         (with-open [^java.util.stream.Stream res @(xtdb.protocols/open-query& node query opts)]
           (f (.reduce res init
                       (reify java.util.function.BinaryOperator
                         (apply [_ a b]
                           (f a b)))))))))))

❤️ 2
jarohen08:04:01

or, if you don't mind working with Java Streams, a simpler version:

(defn open-q-stream
  (^java.util.stream.Stream [node query] (open-q-stream node query {}))
  (^java.util.stream.Stream [node query opts]
   (let [opts (-> (into {:default-all-valid-time? false} opts)
                  (xtdb.time/after-latest-submitted-tx node))]

     @(xtdb.protocols/open-query& node query opts))))

;; usage

(with-open [^Stream res (xt/open-q-stream *node* '(from :docs [{:xt/id e} inst]))]
  (vec (.toList res)))