This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2019-10-31
Channels
- # announcements (5)
- # babashka (105)
- # beginners (92)
- # calva (77)
- # cider (17)
- # cljdoc (8)
- # cljs-dev (8)
- # cljsrn (8)
- # clojure (272)
- # clojure-dev (25)
- # clojure-europe (5)
- # clojure-italy (6)
- # clojure-nl (7)
- # clojure-norway (3)
- # clojure-uk (108)
- # clojurescript (326)
- # code-reviews (4)
- # cursive (6)
- # datomic (37)
- # duct (5)
- # emacs (14)
- # fulcro (23)
- # graphql (1)
- # juxt (1)
- # kaocha (2)
- # leiningen (10)
- # malli (9)
- # music (1)
- # nrepl (12)
- # pathom (21)
- # pedestal (2)
- # planck (4)
- # quil (3)
- # reitit (29)
- # rewrite-clj (10)
- # shadow-cljs (82)
- # spacemacs (29)
- # sql (6)
- # tools-deps (19)
hey guys, need some advice / code review the code will communicate with sqs ( I’m using cognitect aws ), fetch messages and dispatch to workers ( responsible to do the job )
aws SQS <----------- fetch -------------> workers
(ns accounting.queue
(:require [mount.core :refer [defstate]]
[accounting.config :refer [config]]
[cognitect.aws.client.api :as aws]
[cognitect.aws.credentials :as credentials]
[cheshire.core :as json]
[clojure.tools.logging :as log]
[clj-time.core :as time]))
(defprotocol QueueProtocol
(messages [queue])
(delete [queue receipt])
(run [queue])
(stop [queue]))
(defn build-client [{:keys [aws_region aws_key aws_secret_key]}]
(aws/client {:api :sqs
:region aws_region
:credentials-provider (credentials/basic-credentials-provider
{:access-key-id aws_key
:secret-access-key aws_secret_key})}))
(defn find-queue [client name]
(:QueueUrl (aws/invoke client {:op :GetQueueUrl
:request {:QueueName name}})))
(defn create-queue [client name]
(aws/invoke client {:op :CreateQueue
:request {:QueueName name}}))
(defn init-queue [client queue-name]
(or (find-queue client queue-name)
(create-queue client queue-name)))
(defn delete* [{client :client queue-url :endpoint} receipt]
(aws/invoke client {:op :DeleteMessage
:request {:QueueUrl queue-url
:ReceiptHandle receipt}}))
(defn messages* [{client :client queue-url :endpoint}]
(aws/invoke client {:op :ReceiveMessage
:request {:QueueUrl queue-url}}))
(defmulti worker (fn [{type :type}] (keyword type)))
(defmethod worker :invoice-created [message]
(generate-invoice message))
(defmethod worker :default [envelope]
(log/info "envelope received"))
(defn- handle [queue messages]
(doseq [{receipt :ReceiptHandle body :Body } messages]
(try
(worker (json/parse-string body true))
(delete queue receipt)
(catch Exception e
(log/error (ex-message e))))))
(defn run* [queue]
(future
(loop []
(let [messages (messages queue)]
(doseq [message (:Messages messages)] (handle queue message)))
(recur))))
(defrecord Queue [client endpoint runner]
QueueProtocol
(messages [queue] (messages* queue))
(delete [queue receipt] (delete* queue receipt))
(run [queue] (assoc queue runner (run* queue)))
(stop [_] (when (and runner (not (future-done? runner))
(future-cancel runner)))))
(defn build-queues [{queues :aws_sqs_queues :as cfg}]
(let [client (build-client cfg)]
(mapv #(run (map->Queue {:client client :endpoint (init-queue client %)})) queues)))
(defstate queue
:start {:queues (build-queues config)}
:stop (doseq [q (:queues queue)] (stop q)))