This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2022-01-14
Channels
- # announcements (2)
- # aws (1)
- # babashka (18)
- # babashka-sci-dev (103)
- # beginners (165)
- # calva (51)
- # cider (8)
- # circleci (1)
- # clj-kondo (22)
- # clj-on-windows (2)
- # cljdoc (1)
- # cljfx (31)
- # cljs-dev (16)
- # clojure (81)
- # clojure-europe (71)
- # clojure-nl (7)
- # clojure-uk (11)
- # clojurescript (20)
- # code-reviews (26)
- # conjure (1)
- # contributions-welcome (1)
- # core-async (15)
- # cursive (8)
- # datomic (8)
- # defnpodcast (2)
- # eastwood (24)
- # emacs (10)
- # events (1)
- # fulcro (4)
- # funcool (31)
- # graalvm (43)
- # graphql (8)
- # honeysql (9)
- # introduce-yourself (1)
- # jobs (12)
- # kaocha (3)
- # lsp (28)
- # malli (4)
- # meander (4)
- # membrane (7)
- # off-topic (64)
- # other-languages (3)
- # pedestal (1)
- # polylith (31)
- # portal (5)
- # re-frame (4)
- # reitit (1)
- # releases (5)
- # rum (2)
- # schema (2)
- # sci (34)
- # shadow-cljs (21)
- # vscode (1)
@benjamin.schwerdtner This is what I came up with when I thought how I'd do it:
(defn make-flower-sender
[]
(let [stopping-chan (a/promise-chan)
flowers [:lily :rose :tulip :orchid]
destinations (atom {})]
(a/thread
(try
(loop []
(when-not (a/poll! stopping-chan)
(doseq [[_country cities] @destinations
:let [city (rand-nth (keys cities))
city-chan (city cities)]]
(println "putting into " city)
(a/put! city-chan (rand-nth flowers)))
(Thread/sleep 1000)
(recur)))
(catch Exception e
(println e))))
{:stop #(a/put! stopping-chan true)
:new-destination (fn[country city]
(let [c (a/chan (a/dropping-buffer 1))]
(swap! destinations
update country assoc city c)
c))}))
(def flower-sender (make-flower-sender))
(def usa-seattle ((:new-destination flower-sender) :usa :seattle))
(def usa-new-york ((:new-destination flower-sender) :usa :new-york))
(def canada-toronto ((:new-destination flower-sender) :canada :toronto))
(a/poll! usa-seattle)
(a/poll! usa-new-york)
(a/poll! canada-toronto)
((:stop flower-sender))
You can add new country/city combinations, it creates a new channel for it and starts sending flowers to them (only one city each second per country). Then I'd assume you'd have things that want to receive the flowers per city and process them, they can just do so on the channel returned by new-destination
. When you want to stop the sending of flower, call stop
.
When you use a/go, you're running on the async pool. That pool you should almost think of it as a single thread.
So think of everything you run inside a/go as a single thread. That means if you run anything which takes a while until it either finishes running or parks, it is hogging the thread and nothing else can run.
When you want concurrency and high responsiveness, you want to make sure everything waits as little as possible before getting a turn.
That means you should exclusively restrict the use of a/go for very short running computations or non-blocking io. Anything that is longer or blocking you should move to a/thread.
Now in my example, creating the flowers is very quick, so you could switch to a/go, but if creating the flowers took more compute time or needing to do some blocking IO, using a/thread would make more sense.
So it'll depend on that in your case to choose which one to use.
If you switch to a/go though, make sure to change the Thread/sleep to a parking on a timeout chan instead (a/<! (a/timeout ms))
, because Tread/sleep is blocking and like I said, you should not do anything blocking inside an a/go.
beginner question/sanity check: in clojurescript, I have a collection of data, order irrelevant. for each item, I need to compute something (using an external lib) which returns a couple of promises. I'm thinking of creating a single result chan, run each item in a go
block in which I Promise.all the computation results and put them on the result chan. I read from that chan in a go-loop
and aggregate until all results are in. is that a reasonable way or is there something simpler I can do?
(defn process-item [item >res]
(let [sub-items (divide-item item)
results* (->> (map lib-fn sub-items)
(.all js/Promise))]
(go (>! >res (<p! results*)))
(defn process [items]
(let [>res (async/chan)]
(run! #(process-item % >res))
(go-loop [agg {}]
(if (done? agg)
agg
(recur (aggregate agg (<! >res)))
Can't you just loop over the items, divide them, call the lib, and then wait for all results, and then aggregate.
(defn process-items [items]
(->> (for [item items
sub-items (divide-item item)]
(map lib-fn sub-items))
(<p! (.all js/Promise))))
Unless I missed something about what you're trying to do?huh, I was under the impression that <p! must be called in go block
thank you!
Oh, you might be right about that. You can use p->c
instead then: https://github.com/clojure/core.async/blob/master/src/main/clojure/cljs/core/async/interop.cljs#L13