Fork me on GitHub
#code-reviews
<
2019-10-31
>
tdantas23:10:02

hey guys, need some advice / code review the code will communicate with sqs ( I’m using cognitect aws ), fetch messages and dispatch to workers ( responsible to do the job )

aws SQS <----------- fetch -------------> workers

tdantas23:10:06

(ns accounting.queue
  (:require [mount.core :refer [defstate]]
            [accounting.config :refer [config]]
            [cognitect.aws.client.api :as aws]
            [cognitect.aws.credentials :as credentials]
            [cheshire.core :as json]
            [clojure.tools.logging :as log]
            [clj-time.core :as time]))

(defprotocol QueueProtocol
  (messages [queue])
  (delete [queue receipt])
  (run [queue])
  (stop [queue]))

(defn build-client [{:keys [aws_region aws_key aws_secret_key]}]
  (aws/client {:api :sqs
               :region aws_region
               :credentials-provider (credentials/basic-credentials-provider
                                       {:access-key-id aws_key
                                        :secret-access-key aws_secret_key})}))

(defn find-queue [client name]
  (:QueueUrl (aws/invoke client {:op      :GetQueueUrl
                                 :request {:QueueName name}})))

(defn create-queue [client name]
  (aws/invoke client {:op      :CreateQueue
                      :request {:QueueName name}}))

(defn init-queue [client queue-name]
  (or (find-queue client queue-name)
      (create-queue client queue-name)))

(defn delete* [{client :client queue-url :endpoint} receipt]
  (aws/invoke client {:op      :DeleteMessage
                      :request {:QueueUrl queue-url
                                :ReceiptHandle receipt}}))

(defn messages* [{client :client queue-url :endpoint}]
  (aws/invoke client {:op :ReceiveMessage
                      :request {:QueueUrl queue-url}}))


(defmulti worker (fn [{type :type}] (keyword type)))

(defmethod worker :invoice-created [message]
  (generate-invoice message))

(defmethod worker :default [envelope]
  (log/info "envelope received"))


(defn- handle [queue messages]
  (doseq [{receipt :ReceiptHandle body :Body } messages]
    (try
      (worker (json/parse-string body true))
      (delete queue receipt)
      (catch Exception e
          (log/error (ex-message e))))))

(defn run* [queue]
  (future
      (loop []
        (let [messages (messages queue)]
          (doseq [message (:Messages messages)] (handle queue message)))
        (recur))))

(defrecord Queue [client endpoint runner]
  QueueProtocol
  (messages [queue] (messages* queue))
  (delete [queue receipt] (delete* queue receipt))
  (run [queue] (assoc queue runner (run* queue)))
  (stop [_] (when (and runner (not (future-done? runner))
                            (future-cancel runner)))))

(defn build-queues [{queues :aws_sqs_queues :as cfg}]
  (let [client (build-client cfg)]
    (mapv #(run (map->Queue {:client client :endpoint (init-queue client %)})) queues)))

(defstate queue
          :start {:queues (build-queues config)}
          :stop  (doseq [q (:queues queue)] (stop q)))

tdantas23:10:47

don’t know better way to dispatch messages to worker, my solution was using defmethod/defmulti based on the message type

tdantas23:10:01

is that a good usage of defmethod /defmulti ?