This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2022-06-02
Channels
- # announcements (34)
- # babashka (19)
- # beginners (106)
- # calva (50)
- # cider (25)
- # clj-commons (39)
- # clj-kondo (16)
- # clojure (59)
- # clojure-czech (3)
- # clojure-europe (33)
- # clojure-norway (9)
- # clojure-seattle (1)
- # clojure-sweden (1)
- # clojure-uk (2)
- # clojured (28)
- # clojuredesign-podcast (1)
- # clojurescript (7)
- # code-reviews (19)
- # conjure (15)
- # cursive (3)
- # datomic (3)
- # emacs (21)
- # etaoin (28)
- # graphql (4)
- # introduce-yourself (1)
- # joyride (2)
- # kaocha (2)
- # london-clojurians (8)
- # lsp (24)
- # music (4)
- # nbb (4)
- # nextjournal (1)
- # off-topic (13)
- # other-languages (16)
- # remote-jobs (1)
- # rewrite-clj (6)
- # sci (1)
- # shadow-cljs (40)
- # tools-deps (15)
a coworker is building a polling system to listen to an amazon sqs queue and I'm wondering if this is the smartest way to do it:
(defn start-polling! []
(let [recur? (volatile! true)]
(async/go-loop []
(try (do-stuff-with-sqs)
(async/<! (async/timeout 100))
(catch Throwable ex
(log/fatal ex "an irrecoverable error has occurred")
(vreset! recur? false)))
(when @recur? (recur)))
{::recur? recur?}))
is it safe/smart to call go-block
without returning the created channel?@UEENNMX0T Is this for Clojure or ClojureScript?
Clojure
i should probably note that fetching from sqs is a blocking action
I'd just use future
with a loop
inside and (Thread/sleep 100)
-- using core.async
for this seems... overkill...?
cool, I was kind of hoping to hear that
we're not doing anything else with core.async
inside of the logic part i skipped
I assume the result -- {::recur? recur?}
-- is so that the caller can tell at some future time that polling has stopped? If so, that's kind of a terrible name (and I'd probably use an atom
instead of a volatile)
maybe I should rephrase the question, cuz I think i'm falling into a bit of an "XY problem". we're polling sqs for messages to dispatch jobs, and this is meant to be a long-running process
and yeah, the recur?
variable isn't great lol, but that felt less important than "fundamental structure of the process"
I tend to go for the "simplest sane thing that works" to start with and only make it "fancier" later if we need to. If you have a blocking process, you probably want it on its own thread -- assuming you're not spawning lots of them.
Could this be a job for an agent
?
Suppose one adds a watcher (action to triggger on new message) to an agent, and streams a time-spaced sequence of send-offs (sql poll) to the agent?
That way one could get locally-cached latest message (or a LIFO shortlist of latest n messages), with optional validators, error handlers, and reactive trigger (thanks to the watcher).
One may be able to handle multiple kinds of events this way, given we can send multimethods to agents.
spitballin' ...
(reduce (fn [the-agent the-action]
(send-off the-action the-agent))
(agent {}) ;; or `def` it and attach watchers etc.
(repeatedly (fn [] (wait-for 100 :ms) (call-sqs))))
:thinking_face:?for a queue consumer thats a really stateful thing and we generally want feedback on when things go wrong + marking messages as consumed
so it kinda looks like this for us, with a bunch of callbacks and try/catches around fallible steps
(loop []
(when-not ... check an atomic boolean to know if we should shutdown (REPL friendly) ...
... sleep for a certain amount of time ..
... for every message ...
... try to handle it and mark it as consumed ...
(recur)))
then the start method for "run-standard-consumer-loop-in-thread" is like
(let [poison-pill (AtomicBoolean. false)]
... spawn thread passing ref to poison pill ...
(fn []
(.set poison-pill true)
(.join thread)))
that's very cool, thank you