Fork me on GitHub
#core-async
<
2017-02-08
>
bcbradley01:02:16

Yeah I think I like the version you just posted, its a bit easier to read

bcbradley01:02:36

I actually made something similar to that version after reading up on alts!

bcbradley01:02:30

I can't remember why I didn't stick with it

bcbradley03:02:37

I made some changes to make it work correctly with closed channels, here is what I have so far for my two utility functions:

(defn map-pipe
  "Takes values from in-chan and puts a mapped value to out-chan if it exists. This map can be updated at any time by passing a function of one argument through update-chan. If update-chan or out-chan is closed, values are no longer taken from in-chan or put to out-chan. If in-chan is closed, out-chan is closed but update-chan is not."
  [in-chan out-chan update-chan]
  (async/go-loop [store {}]
    (let [[[k v :as msg] chan] (async/alts! [update-chan in-chan] :priority true)]
      (if (= chan update-chan)
        (when-not (nil? msg)
          (recur (msg store)))
        (if (nil? msg)
          (close! out-chan)
          (do
            (when-let [v (get store k)]
              (when (async/>! out-chan v)
                (recur store)))
            (recur store)))))))
(defn throttle-pipe
  "Pipes a finite number of values from in-chan to out-chan. This number can be updated at any time by passing a function of one argument through update-chan. If update-chan or out-chan is closed, values are no longer piped. If in-chan is closed, out-chan is closed but update-chan is not."
  [in-chan out-chan update-chan]
  (async/go-loop [allowance 0]
    (let [[msg chan] (async/alts! (if (< 0 allowance) [update-chan in-chan] [update-chan]) :priority true)]
      (if (= chan update-chan)
        (when-not (nil? msg)
          (recur (msg allowance)))
        (if (nil? msg)
          (close! out-chan)
          (when (async/>! out-chan msg)
            (recur (dec allowance))))))))
I also renamed them to better represent what they actually are. I was wondering if it seems weird to not close the pipes if the update-chan is closed though. What do you think?