This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2020-07-27
Channels
- # announcements (12)
- # babashka (18)
- # bangalore-clj (3)
- # beginners (110)
- # calva (14)
- # cestmeetup (4)
- # cider (4)
- # clj-kondo (2)
- # clojure (34)
- # clojure-colombia (5)
- # clojure-europe (11)
- # clojure-nl (8)
- # clojure-spec (19)
- # clojure-uk (11)
- # clojurescript (16)
- # clojureverse-ops (8)
- # community-development (4)
- # conjure (65)
- # core-async (19)
- # cursive (22)
- # data-science (7)
- # datascript (1)
- # datomic (20)
- # devcards (1)
- # figwheel-main (64)
- # fulcro (10)
- # graalvm (3)
- # helix (3)
- # kaocha (22)
- # malli (68)
- # meander (6)
- # off-topic (39)
- # pathom (6)
- # pedestal (2)
- # reagent (5)
- # remote-jobs (2)
- # reveal (30)
- # rum (4)
- # shadow-cljs (83)
- # specter (3)
- # tools-deps (9)
- # xtdb (18)
I want to pipe one channel's values to another at a strict rate limit, no bursts, just ct
messages per ms
, starting upon getting the first message. Is there an obvious better way to write this:
(defn limit [ct ms in out]
(go
(let [fst (<! in)
time (a/timeout ms)]
(>! out fst)
(dotimes [_ (dec ct)]
(>! out (<! in)))
(<! time)
(loop [time (a/timeout ms)]
(dotimes [_ ct]
(>! out (<! in)))
(<! time)
(recur (a/timeout ms))))))
(def a (chan))
(def b (chan))
(go-loop []
(when-some [x (<! b)]
(println "got" x "from b")
(recur)))
(limit 5 1000 a b)
(dotimes [x 20] (a/put! a x))
@jjttjj so you want to wait indefinitely if less than ct messages come in before the timeout?
good point. I don't think so. So I think I need a second loop to put stuff on the timeout chan every x ms
also, it seems like you could move the duplicate dotimes into the loop, by adding a parameter for ct (always using ct from the recur, using (dec ct) on the first call)
(defn limit [ct ms in out]
(go
(let [fst (<! in)]
(>! out fst)
(loop [n (dec ct)
time (a/timeout ms)]
(dotimes [_ n]
(>! out (<! in)))
(<! time)
(recur cnt (a/timeout ms))))))
also, if time
was eg. a delay, you could start it with the first message of each cycle
(defn limit [ct ms in out]
(go-loop [td (delay (timeout ms))]
(dotimes [_ n]
(>! out (<! in))
(force td))
(<! @td)
(recur (delay (timeout ms))))
significantly simplified code, if you are OK with the changed behavior (each timer starting with the first item in that time range)
just fixed it - the force of td still has to be after the first take (after the first force, the rest are no-ops)
async channels implements closeable?
looks like no
Clojure 1.10.1
(ins)user=> (require '[clojure.core.async :as >])
nil
(ins)user=> (.close (>/chan))
Execution error (IllegalArgumentException) at user/eval9270 (REPL:1).
No matching field found: close for class clojure.core.async.impl.channels.ManyToManyChannel
hum :thinking_face:
more generally
(ins)user=> (pprint (supers (class (>/chan))))
#{clojure.lang.IType clojure.core.async.impl.protocols.WritePort
clojure.core.async.impl.protocols.Channel
clojure.core.async.impl.channels.MMC
clojure.core.async.impl.protocols.ReadPort java.lang.Object}
nil
wow supers! thanks!
supers is in core?
yes, given a class it gives a set of superclasses (which will mostly be interfaces of course)
yup, understood, I use in general clojure.reflect/reflect, good to know =D
it would be awesome if was