This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2018-02-23
Channels
- # aleph (8)
- # aws (2)
- # beginners (36)
- # cider (35)
- # cljs-dev (157)
- # cljsjs (3)
- # cljsrn (5)
- # clojure (97)
- # clojure-dev (5)
- # clojure-gamedev (3)
- # clojure-italy (27)
- # clojure-russia (11)
- # clojure-spec (19)
- # clojure-uk (39)
- # clojured (6)
- # clojurescript (87)
- # clojutre (4)
- # community-development (35)
- # cursive (8)
- # datascript (2)
- # datomic (14)
- # dirac (8)
- # duct (3)
- # figwheel (13)
- # fulcro (22)
- # graphql (20)
- # jobs (1)
- # london-clojurians (1)
- # off-topic (55)
- # onyx (3)
- # parinfer (3)
- # protorepl (39)
- # re-frame (3)
- # reagent (26)
- # ring (7)
- # ring-swagger (2)
- # rum (1)
- # shadow-cljs (107)
- # spacemacs (8)
- # test-check (4)
- # unrepl (3)
Is there an example illustrating subscribtions in lacinia? I’ve read the docs but it’s still a bit unclear to me.
Yes, it's a bit abstract, and I'm a ways off from having the tutorial catch up to that, but it's on my mental roadmap.
@nooga I could share some snippets, have it working with Kafka as source. Documentation is quit good, but there are many ways to implement it.
so I have a query Coordinates(id: String)
which just returns [Coordinate]
and what I want is to also give consumers ability to subscribe for that so they will periodically get a Coordinate
once the service comes up with a new one.
I’m using message queues and stuff but I already have a channel that moves these. It’s just a matter of plugging that into lacinia.
ok, so here it goes, in my shema.edn I declare a subscription in the :subscriptions like
:stream_transactions
{:type :Transaction
:description "Get transactions as they happen, with optional filtering"
:args {:iban {:type String
:description "optional filter on matching iban"}
:min_amount {:type Int
:description "optional filter based on transferred amount"}
:direction {:type :dtype
:description "optional filter on DEBIT or CREDIT"}}
:stream :stream-transactions}
Then I have a stream-map, like
(defn stream-map
[component]
(let [t-db (:t-db component)
a-db (:a-db component)
m-db (:m-db component)]
{:stream-transactions (stream-transactions t-db)
:get-account (get-account a-db)
:money-transfer (money-transfer m-db)
}))
that is loaded to the schema with the attach-streamers function (defn stream-map
[component]
(let [t-db (:t-db component)
a-db (:a-db component)
m-db (:m-db component)]
{:stream-transactions (stream-transactions t-db)
:get-account (get-account a-db)
:money-transfer (money-transfer m-db)
}))
The function to get the stream, returns a function to stop it
(defn stream-transactions
[t-db]
(fn [_ args source-stream]
(log/debug "starting transaction subscription with args" args)
;; Create an object for the subscription.
(let [id (t-db/create-transaction-subscription t-db source-stream (:iban args) (:min_amount args) (:direction args))]
;; Return a function to cleanup the subscription
#(t-db/stop-transaction-subscription t-db id))))
The function will create a filter, and for each consumerrecord from kafka a function will be called, with also checks for any ‘running’ stream, if the transaction should be send,
(defn add-bc
[cr transactions index subscriptions]
(let [^BalanceChanged bc (.value cr)
bc-map (bc->map bc)
id (str (.getBalanceId bc))
iban (.getIban bc)]
(swap! transactions #(assoc % id bc-map))
(swap! index #(if (get % iban)
(update % iban conj id)
(assoc % iban [id])))
(doseq [[filter-f source-stream] (vals (:map @subscriptions))]
(if (filter-f bc-map)
(source-stream bc-map)))))
By making the Transation data matching the result from the bc->map fuction I don’t need additional resolvers
I see, so the (source-stream bc-map)
part would ultimately become (source-stream fresh-coordinate)
in my code and effectively send it to the client?
yes, source-stream works like a function, where you pass the data to, if you send nil the stream is closed