Fork me on GitHub
#code-reviews
<
2022-06-02
>
Noah Bogart20:06:15

a coworker is building a polling system to listen to an amazon sqs queue and I'm wondering if this is the smartest way to do it:

(defn start-polling! []
  (let [recur? (volatile! true)]
    (async/go-loop []
     (try (do-stuff-with-sqs)
          (async/<! (async/timeout 100))
          (catch Throwable ex
            (log/fatal ex "an irrecoverable error has occurred")
            (vreset! recur? false)))
     (when @recur? (recur)))
    {::recur? recur?}))
is it safe/smart to call go-block without returning the created channel?

seancorfield21:06:11

@UEENNMX0T Is this for Clojure or ClojureScript?

Noah Bogart21:06:02

i should probably note that fetching from sqs is a blocking action

seancorfield21:06:15

I'd just use future with a loop inside and (Thread/sleep 100) -- using core.async for this seems... overkill...?

Noah Bogart21:06:59

cool, I was kind of hoping to hear that

Noah Bogart21:06:26

we're not doing anything else with core.async inside of the logic part i skipped

seancorfield21:06:05

I assume the result -- {::recur? recur?} -- is so that the caller can tell at some future time that polling has stopped? If so, that's kind of a terrible name (and I'd probably use an atom instead of a volatile)

Noah Bogart21:06:00

maybe I should rephrase the question, cuz I think i'm falling into a bit of an "XY problem". we're polling sqs for messages to dispatch jobs, and this is meant to be a long-running process

Noah Bogart21:06:59

and yeah, the recur? variable isn't great lol, but that felt less important than "fundamental structure of the process"

seancorfield21:06:14

I tend to go for the "simplest sane thing that works" to start with and only make it "fancier" later if we need to. If you have a blocking process, you probably want it on its own thread -- assuming you're not spawning lots of them.

👍 1
adi04:06:26

Could this be a job for an agent? Suppose one adds a watcher (action to triggger on new message) to an agent, and streams a time-spaced sequence of send-offs (sql poll) to the agent? That way one could get locally-cached latest message (or a LIFO shortlist of latest n messages), with optional validators, error handlers, and reactive trigger (thanks to the watcher). One may be able to handle multiple kinds of events this way, given we can send multimethods to agents.

adi04:06:51

spitballin' ...

(reduce (fn [the-agent the-action]
            (send-off the-action the-agent))
        (agent {}) ;; or `def` it and attach watchers etc.
        (repeatedly (fn [] (wait-for 100 :ms) (call-sqs))))
:thinking_face:?

emccue13:06:05

for a queue consumer thats a really stateful thing and we generally want feedback on when things go wrong + marking messages as consumed

emccue13:06:09

so it kinda looks like this for us, with a bunch of callbacks and try/catches around fallible steps

emccue13:06:44

(loop []
  (when-not ... check an atomic boolean to know if we should shutdown (REPL friendly) ...
    ... sleep for a certain amount of time ..
    ... for every message ...
       ... try to handle it and mark it as consumed ...
    (recur)))

👍 1
emccue13:06:55

then the start method for "run-standard-consumer-loop-in-thread" is like

(let [poison-pill (AtomicBoolean. false)]
  ... spawn thread passing ref to poison pill ...
  (fn []
    (.set poison-pill true)
    (.join thread)))

👍 1
emccue14:06:10

so you can have a convenient shutdown callback

Noah Bogart14:06:32

that's very cool, thank you