Fork me on GitHub
#core-async
<
2021-12-19
>
Benjamin09:12:46

does this make sense to you? I have a stream of events and I like to make replies, but I want to throttle by discord channel id. I'm thinking to use pub/sub and ensure the subs somehow (the channel ids are only known at runtime)

Benjamin10:12:21

ended up with this, probably awful for some reason 😛

(defn
  <throttle-by-topic
  [<in topic-fn >out]
  (let [topic->c (atom {})
        ensure-sub (fn
                     [topic]
                     (or
                      (@topic->c topic)
                      (let [c1 (a/chan (a/sliding-buffer 1))
                            c (<throttle c1 2000)]
                        (a/pipe c >out)
                        (get
                         (swap! topic->c assoc topic c1)
                         topic))))]
    (a/go-loop
        [m (a/<! <in)]
        (when-let
            [topic (and m (topic-fn m))]
            (a/>! (ensure-sub topic) m)
            (recur (a/<! <in))))
    >out))

Drew Verlee07:12:48

i started thinking about your problem, then i started reading this post https://github.com/ptaoussanis/sente/issues/124

Drew Verlee07:12:32

i belive zach built a lib around the idea of windowing and backpressure bc he couldn't make core async work the way he needed.

Drew Verlee07:12:57

https://github.com/clj-commons/manifold/blob/master/doc/stream.md talks about throttling but i didnt look at the implementation at all.

Benjamin09:12:21

gonna check 👀