Fork me on GitHub
#graphql
<
2018-02-23
>
hlship01:02:31

lacinia 0.25.0-rc-2 and lacinia-pedestal 0.7.0-rc-2 up on clojars

nooga10:02:47

Is there an example illustrating subscribtions in lacinia? I’ve read the docs but it’s still a bit unclear to me.

hlship17:02:36

Yes, it's a bit abstract, and I'm a ways off from having the tutorial catch up to that, but it's on my mental roadmap.

gklijs11:02:40

@nooga I could share some snippets, have it working with Kafka as source. Documentation is quit good, but there are many ways to implement it.

nooga11:02:36

so I have a query Coordinates(id: String) which just returns [Coordinate]and what I want is to also give consumers ability to subscribe for that so they will periodically get a Coordinate once the service comes up with a new one. I’m using message queues and stuff but I already have a channel that moves these. It’s just a matter of plugging that into lacinia.

nooga11:02:27

and that part I don’t understand yet

gklijs11:02:23

ok, so here it goes, in my shema.edn I declare a subscription in the :subscriptions like

:stream_transactions
  {:type        :Transaction
   :description "Get transactions as they happen, with optional filtering"
   :args        {:iban       {:type        String
                              :description "optional filter on matching iban"}
                 :min_amount {:type        Int
                              :description "optional filter based on transferred amount"}
                 :direction  {:type        :dtype
                              :description "optional filter on DEBIT or CREDIT"}}
   :stream      :stream-transactions}

gklijs11:02:58

Then I have a stream-map, like

(defn stream-map
  [component]
  (let [t-db (:t-db component)
        a-db (:a-db component)
        m-db (:m-db component)]
    {:stream-transactions (stream-transactions t-db)
     :get-account (get-account a-db)
     :money-transfer (money-transfer m-db)
     }))
that is loaded to the schema with the attach-streamers function
(defn stream-map
  [component]
  (let [t-db (:t-db component)
        a-db (:a-db component)
        m-db (:m-db component)]
    {:stream-transactions (stream-transactions t-db)
     :get-account (get-account a-db)
     :money-transfer (money-transfer m-db)
     }))

gklijs11:02:38

The function to get the stream, returns a function to stop it

(defn stream-transactions
  [t-db]
  (fn [_ args source-stream]
    (log/debug "starting transaction subscription with args" args)
    ;; Create an object for the subscription.
    (let [id (t-db/create-transaction-subscription t-db source-stream (:iban args) (:min_amount args) (:direction args))]
      ;; Return a function to cleanup the subscription
      #(t-db/stop-transaction-subscription t-db id))))

gklijs11:02:33

The function will create a filter, and for each consumerrecord from kafka a function will be called, with also checks for any ‘running’ stream, if the transaction should be send,

(defn add-bc
  [cr transactions index subscriptions]
  (let [^BalanceChanged bc (.value cr)
        bc-map (bc->map bc)
        id (str (.getBalanceId bc))
        iban (.getIban bc)]
    (swap! transactions #(assoc % id bc-map))
    (swap! index #(if (get % iban)
                    (update % iban conj id)
                    (assoc % iban [id])))
    (doseq [[filter-f source-stream] (vals (:map @subscriptions))]
      (if (filter-f bc-map)
        (source-stream bc-map)))))

gklijs12:02:22

By making the Transation data matching the result from the bc->map fuction I don’t need additional resolvers

nooga12:02:37

I see, so the (source-stream bc-map) part would ultimately become (source-stream fresh-coordinate) in my code and effectively send it to the client?

gklijs12:02:43

yes, source-stream works like a function, where you pass the data to, if you send nil the stream is closed

nooga12:02:51

that’s great

nooga12:02:55

I think I got it

nooga12:02:06

thank you!

gklijs12:02:05

I was using lacinia-pedestal, so just had to to start like

(start [this]
    (assoc this :server (-> schema-provider
                            :schema
                            (lp/service-map {:graphiql      true
                                             :subscriptions true})
                            http/create-server
                            http/start)))

nooga12:02:28

yeah, that part I have already 🙂

nooga12:02:54

great stuff, thanks again 👏