Fork me on GitHub
#core-async
<
2022-01-14
>
didibus06:01:19

@benjamin.schwerdtner This is what I came up with when I thought how I'd do it:

(defn make-flower-sender
  []
  (let [stopping-chan (a/promise-chan)
        flowers [:lily :rose :tulip :orchid]
        destinations (atom {})]
    (a/thread
      (try
        (loop []
          (when-not (a/poll! stopping-chan)
            (doseq [[_country cities] @destinations
                    :let [city (rand-nth (keys cities))
                          city-chan (city cities)]]
              (println "putting into " city)
              (a/put! city-chan (rand-nth flowers)))
            (Thread/sleep 1000)
            (recur)))
        (catch Exception e
          (println e))))
    {:stop #(a/put! stopping-chan true)
     :new-destination (fn[country city]
                        (let [c (a/chan (a/dropping-buffer 1))]
                          (swap! destinations
                                 update country assoc city c)
                          c))}))

(def flower-sender (make-flower-sender))

(def usa-seattle ((:new-destination flower-sender) :usa :seattle))
(def usa-new-york ((:new-destination flower-sender) :usa :new-york))
(def canada-toronto ((:new-destination flower-sender) :canada :toronto))

(a/poll! usa-seattle)
(a/poll! usa-new-york)
(a/poll! canada-toronto)

((:stop flower-sender))

didibus06:01:30

You can add new country/city combinations, it creates a new channel for it and starts sending flowers to them (only one city each second per country). Then I'd assume you'd have things that want to receive the flowers per city and process them, they can just do so on the channel returned by new-destination. When you want to stop the sending of flower, call stop.

👀 1
Benjamin08:01:08

thanks that's helping me a lot

Benjamin08:01:23

what are the reaasons to use a/thread vs a/go ?

didibus23:01:57

When you use a/go, you're running on the async pool. That pool you should almost think of it as a single thread. So think of everything you run inside a/go as a single thread. That means if you run anything which takes a while until it either finishes running or parks, it is hogging the thread and nothing else can run. When you want concurrency and high responsiveness, you want to make sure everything waits as little as possible before getting a turn. That means you should exclusively restrict the use of a/go for very short running computations or non-blocking io. Anything that is longer or blocking you should move to a/thread. Now in my example, creating the flowers is very quick, so you could switch to a/go, but if creating the flowers took more compute time or needing to do some blocking IO, using a/thread would make more sense. So it'll depend on that in your case to choose which one to use. If you switch to a/go though, make sure to change the Thread/sleep to a parking on a timeout chan instead (a/<! (a/timeout ms)), because Tread/sleep is blocking and like I said, you should not do anything blocking inside an a/go.

michihuber11:01:23

beginner question/sanity check: in clojurescript, I have a collection of data, order irrelevant. for each item, I need to compute something (using an external lib) which returns a couple of promises. I'm thinking of creating a single result chan, run each item in a go block in which I Promise.all the computation results and put them on the result chan. I read from that chan in a go-loop and aggregate until all results are in. is that a reasonable way or is there something simpler I can do?

michihuber11:01:28

(defn process-item [item >res]
  (let [sub-items (divide-item item)
        results* (->> (map lib-fn sub-items)
                      (.all js/Promise))]
    (go (>! >res (<p! results*)))

(defn process [items]
  (let [>res (async/chan)]
    (run! #(process-item % >res))
    (go-loop [agg {}]
       (if (done? agg)
         agg
         (recur (aggregate agg (<! >res)))

lilactown16:01:22

how would you like to handle errors in the calculation?

didibus07:01:54

Doesn't seem you need core.async at all

didibus07:01:41

Can't you just loop over the items, divide them, call the lib, and then wait for all results, and then aggregate.

didibus07:01:50

(defn process-items [items]
  (->> (for [item items
             sub-items (divide-item item)]
         (map lib-fn sub-items))
       (<p! (.all js/Promise))))
Unless I missed something about what you're trying to do?

michihuber08:01:17

huh, I was under the impression that <p! must be called in go block

didibus23:01:16

The only difference is that <p! will throw if the promise is rejected, where as p->c just returns the resolved value on the channel, be it rejected or not. If rejected it will put an ex-info on the channel.