This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2023-08-08
Channels
- # announcements (1)
- # babashka (39)
- # beginners (44)
- # clj-kondo (10)
- # cljdoc (24)
- # clojure (49)
- # clojure-austin (2)
- # clojure-berlin (6)
- # clojure-europe (13)
- # clojure-nl (1)
- # clojure-norway (5)
- # clojure-uk (1)
- # core-async (11)
- # cursive (7)
- # datahike (3)
- # datalevin (2)
- # fulcro (1)
- # hyperfiddle (40)
- # jobs (12)
- # juxt (5)
- # lsp (9)
- # nyc (1)
- # off-topic (27)
- # re-frame (7)
- # releases (3)
- # shadow-cljs (9)
- # timbre (6)
- # xtdb (2)
- # yamlscript (1)
👋 i was hoping to get some conceptual help from someone more experienced with core.async and specifically, threads and exceptions. i’ve been reading through this slack history as well as resources like https://wil.yegelwel.com/Error-Handling-with-Clojure-Async/ and https://gist.github.com/vvvvalvalval/f1250cec76d3719a8343, but i still can’t really grok my issue.
i’m using proletariat as a job runner for background processing. it has a pool of workers who run on their own thread- they pick up a not-yet-run job, and execute it on their individual thread. if something in that job throws, the worker will catch the exception and put the job back in the queue so a separate re-try process can work out what to do.
i realized pretty quickly that i have an incomplete understanding of core.async. suppose that a job on this worker thread needs to execute a long-running process. i want updates on that process over time- which is a good candidate for core.async and using a progress-ch
onto which the process can put!
updates over time. what i’ve worked out so far is this:
;; do a process with updates
(let [ex-ch (a/chan)
update-ch (a/chan 5)
result-ch (a/thread
(try
(do (long-running-process update-ch))
(catch Exception e
(a/>!! ex-ch e))))]
(loop [[v ch] (a/alts!! [update-ch result-ch ex-ch])]
(cond
(= ch result-ch)
:done
(= ch ex-ch)
:error
(= ch update-ch)
(do
(db/async-update v)
(recur (a/alts!! [update-ch result-ch ex-ch]))))))))
i guess what i’m mostly concerned with is this:
1. how do these a/threads get cleaned up? assuming in (= ch ex-ch)
i rethrew the error caught in the a/thread
, this would trigger proletariat to rerun the job on a new worker. i really don’t understand what happens to these channels in the aftermath.
2. is this a reasonable approach, or am i going halfway around the world to reinvent a mechanism that already exists for this? a clojure.core.async/thread is basically the same as clojure.core/future, but instead of conveying the result via a java Future it does it over a channel
I think having distinct channels for errors and for values is fine. More often what I see is conveying errors as values over the same channel, but that can be confusing sometimes
ah. okay. so the blocking nature of alts!! is saving me here and preventing a world of confusing concurrency brain-hurt debugging. the thread executing this code will block until the future (the a/thread
) resolves. if, conversely, this were all happening inside a (go ...)
the executing thread would pick up my job, start an async process in the go-block and immediately finish without waiting (because that’s the nature of go
). i have no idea what would happen to those channels or the running process in that case either, lol. garbage collected eventually? zombied forever?
the goal of (go ...)
is to execute in a way where the behavior is as much like (thread ...)
as it can be while not actually taking up a thread