This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2024-04-19
Channels
- # announcements (2)
- # babashka (53)
- # beginners (71)
- # calva (15)
- # clerk (4)
- # cljfx (1)
- # clojure (18)
- # clojure-europe (13)
- # clojure-nl (1)
- # clojure-norway (26)
- # clojure-uk (4)
- # datascript (2)
- # datomic (3)
- # emacs (4)
- # events (1)
- # fulcro (26)
- # honeysql (2)
- # hyperfiddle (6)
- # missionary (8)
- # reitit (2)
- # releases (2)
- # shadow-cljs (10)
- # squint (33)
- # xtdb (4)
if you don't mind relying on (very) internal APIs (with all the usual caveats), you could use this until we're in a place to commit to a streaming API:
;; use with `into`, `transduce`, `reduce`, `run!`, etc
(defn plan-q
([node query] (plan-q node query {}))
([node query opts]
(let [opts (-> (into {:default-all-valid-time? false} opts)
(xtdb.time/after-latest-submitted-tx node))]
(reify clojure.lang.IReduceInit
(reduce [_ f init]
(with-open [^java.util.stream.Stream res @(xtdb.protocols/open-query& node query opts)]
(f (.reduce res init
(reify java.util.function.BinaryOperator
(apply [_ a b]
(f a b)))))))))))
❤️ 2
or, if you don't mind working with Java Streams, a simpler version:
(defn open-q-stream
(^java.util.stream.Stream [node query] (open-q-stream node query {}))
(^java.util.stream.Stream [node query opts]
(let [opts (-> (into {:default-all-valid-time? false} opts)
(xtdb.time/after-latest-submitted-tx node))]
@(xtdb.protocols/open-query& node query opts))))
;; usage
(with-open [^Stream res (xt/open-q-stream *node* '(from :docs [{:xt/id e} inst]))]
(vec (.toList res)))