Fork me on GitHub
#core-async
<
2020-07-27
>
jjttjj18:07:15

I want to pipe one channel's values to another at a strict rate limit, no bursts, just ct messages per ms, starting upon getting the first message. Is there an obvious better way to write this:

(defn limit [ct ms in out]
  (go
    (let [fst (<! in)
          time  (a/timeout ms)]
      (>! out fst)
      (dotimes [_ (dec ct)]
        (>! out (<! in)))
      (<! time)
      (loop [time (a/timeout ms)]
        (dotimes [_ ct]
          (>! out (<! in)))
        (<! time)
        (recur (a/timeout ms))))))

(def a (chan))
(def b (chan))

(go-loop []
  (when-some [x (<! b)]
    (println "got" x "from b")
    (recur)))

(limit 5 1000 a b)

(dotimes [x 20] (a/put! a x))

noisesmith19:07:45

@jjttjj so you want to wait indefinitely if less than ct messages come in before the timeout?

jjttjj19:07:42

good point. I don't think so. So I think I need a second loop to put stuff on the timeout chan every x ms

noisesmith19:07:38

also, it seems like you could move the duplicate dotimes into the loop, by adding a parameter for ct (always using ct from the recur, using (dec ct) on the first call)

noisesmith19:07:48

(defn limit [ct ms in out]
  (go
    (let [fst (<! in)]
      (>! out fst)
      (loop [n (dec ct)
             time (a/timeout ms)]
         (dotimes [_ n]
            (>! out (<! in)))
         (<! time)
         (recur cnt (a/timeout ms))))))

jjttjj19:07:03

oh yeah, awesome, I knew I was missing something like that

noisesmith19:07:48

also, if time was eg. a delay, you could start it with the first message of each cycle

(defn limit [ct ms in out]
  (go-loop [td (delay (timeout ms))]
     (dotimes [_ n]
       (>! out (<! in))
       (force td))
      (<! @td)
      (recur (delay (timeout ms))))

noisesmith19:07:14

significantly simplified code, if you are OK with the changed behavior (each timer starting with the first item in that time range)

jjttjj19:07:40

yes, that's actually what I want

noisesmith19:07:10

just fixed it - the force of td still has to be after the first take (after the first force, the rest are no-ops)

Ian Fernandez19:07:27

async channels implements closeable?

noisesmith19:07:18

looks like no

Clojure 1.10.1
(ins)user=> (require '[clojure.core.async :as >])
nil
(ins)user=> (.close (>/chan))
Execution error (IllegalArgumentException) at user/eval9270 (REPL:1).
No matching field found: close for class clojure.core.async.impl.channels.ManyToManyChannel

Ian Fernandez19:07:41

hum :thinking_face:

noisesmith19:07:04

more generally

(ins)user=> (pprint (supers (class (>/chan))))
#{clojure.lang.IType clojure.core.async.impl.protocols.WritePort
  clojure.core.async.impl.protocols.Channel
  clojure.core.async.impl.channels.MMC
  clojure.core.async.impl.protocols.ReadPort java.lang.Object}
nil

Ian Fernandez19:07:25

wow supers! thanks!

Ian Fernandez19:07:39

supers is in core?

noisesmith19:07:09

yes, given a class it gives a set of superclasses (which will mostly be interfaces of course)

Ian Fernandez19:07:39

yup, understood, I use in general clojure.reflect/reflect, good to know =D

Ian Fernandez19:07:07

it would be awesome if was