This page is not created by, affiliated with, or supported by Slack Technologies, Inc.
2017-02-06
Channels
- # aleph (2)
- # aws (3)
- # bangalore-clj (3)
- # beginners (119)
- # boot (263)
- # cider (13)
- # cljs-dev (16)
- # clojars (2)
- # clojure (114)
- # clojure-austin (1)
- # clojure-chicago (1)
- # clojure-finland (1)
- # clojure-france (24)
- # clojure-italy (6)
- # clojure-russia (28)
- # clojure-serbia (7)
- # clojure-spain (1)
- # clojure-spec (89)
- # clojure-uk (139)
- # clojurescript (216)
- # community-development (3)
- # core-async (135)
- # css (2)
- # cursive (31)
- # datomic (44)
- # emacs (15)
- # hoplon (2)
- # jobs (3)
- # lein-figwheel (14)
- # leiningen (2)
- # lumo (21)
- # off-topic (16)
- # om (7)
- # om-next (1)
- # onyx (53)
- # perun (9)
- # planck (15)
- # portland-or (29)
- # protorepl (2)
- # re-frame (32)
- # reagent (8)
- # ring-swagger (22)
- # rum (51)
- # spacemacs (4)
- # untangled (2)
exception catching in go blocks is pretty broken http://dev.clojure.org/jira/browse/ASYNC-169
Thread$UncaughtExceptionHandler is also not very reliable, exception catching can happen anywhere, then your handler never gets called
in fact I think the task that runs go blocks has a try/catch so you'll never get anything reported there
it has been a while since I wrote the patch on that issue, so the details are not fresh in my mind, but you could very easily be getting exceptions that are not being logged for whatever reason
the fact that core.async releases have been cut with exception handling in the go macro being so broken is rather frustrating. There is actually a patch on another similar issue that predates mine, which no longer cleanly applied, and I didn't notice until I had written a patch to fix it (and a number of related exception handling issues)
@hiredman Oh right, totally forgot about this. I even had the pleasure to run into this myself before (http://dev.clojure.org/jira/browse/ASYNC-168). I still have that bookmarked to finally try your patch. Any clue why the issue has been idle for so long now?
@hiredman that sounds pretty bad. I thought, I had all the cases covered, where an exception might pop up.
Do you guys think there is anything wrong with this?
(defn throttle-chan [in-chan out-chan relimit-chan]
(let [allowance (atom 0)]
(async/go-loop [f (async/<! relimit-chan)]
(swap! allowance f)
(recur (async/<! relimit-chan)))
(async/go-loop [data (async/<! in-chan)
buff (async/chan (async/sliding-buffer 1))]
(swap!
allowance
(fn [old]
(if (< 0 old)
(do (async/go (async/>! buff data))
(- old 1))
old)))
(async/>! out-chan (async/<! buff))
(recur
(async/<! in-chan)
(async/chan (async/sliding-buffer 1))))))
I get around the fact that atoms may invoke their swap function more than once if the resource is contested by putting a sliding buffer before the outchan
if allowance is zero or less the swap will still complete but go-loop will wait forever for buff to receive data
@bcbradley Can you explain what exactly your function is supposed to do? I associate throttling with something like limiting the message frequency of a channel but I don't see a timing element in your code so you must be thinking of something else.
@dergutemoritz it is supposed to pipe things coming from the in-chan to the out-chan while it has an allowance to do so
you can increase the allowance at whatever rate you want by supplying something like (partial + 1) to relimit-chan
Alright, thanks for clarifying!
also your atom swap fn has side effects
atom swap fns can be called an arbitrary number of times
I'll think about it a bit and get back to you if I come up with a good idea 🙂
(defn throttle-chan [in-chan out-chan relimit-chan]
(let [allowance (atom 0)]
(async/go-loop [f (async/<! relimit-chan)]
(swap! allowance f)
(recur (async/<! relimit-chan)))
(add-watch allowance :allower
(fn [_ _ _ new]
(when (< 0 new)
(async/go
(let [data (async/<! in-chan)
buff (async/chan (async/sliding-buffer 1))]
(swap! allowance
(fn [old]
(async/>! buff data)
(max 0 (dec old))))
(async/>! out-chan (async/<! buff)))))))))
watches can have problems as well
Watches are run on the thread that calls swap! so they can end up running in parallel
Is this basically a variable buffer size?
Oh yeah, even more reason to avoid them if possible 🙂
@tbaldridge Do you have any info on the exception situation that came up earlier?
i don't understand the syntax on line 4 with the vector as the first element of the list
@bcbradley My version has the slight disadvantage that putting to relimit-chan
could block if the loop is currently blocked by putting to out-chan
but that might not be an issue in practice.
@bcbradley That's the syntax of async/alt!
, look it up in its doc string
@dergutemoritz upvote the ticket or ping @alexmiller about http://dev.clojure.org/jira/browse/ASYNC-169
The spinning loop problem with core.async is a thing, and we have a patch for it, so I'm not sure where it needs to go from there.
@dergutemoritz so here's another option...there's a internal protocol for buffers that's fairly simple and would make this code rather clean. However a) it's not a public specification so core.async may change the interface in the future
On the other hand it hasn't changed in like 3-4 years, so it probably won't change
It's pretty trivial to write a custom buffer that calculates full?
based on the value in an atom - current buffer size.
but....like that protocol was last changed 1.5 years ago
So it's up to you if you want to risk it or not.
if i made a throttle-buffer using that protocol do you think it might break in the future?
@tbaldridge Thanks, upvoted 🙂
@bcbradley I'd put it in a function that's fairly abstract. And if the protocol ever changes, I'd rewrite the function
by abstract I mean that all the details of how you implement this throttle shouldn't escape from that function.
but i think i can work around that like this:
(defn- throttle-chan [in-chan out-chan relimit-chan]
(let [buff-chan (async/chan)]
(async/go-loop [allowance 0]
(async/alt!
relimit-chan
([f] (recur (f allowance)))
in-chan
([val] (if (< 0 allowance)
(do (async/>! buff-chan val)
(recur (dec allowance)))
(recur allowance)))))
(async/pipe buff-chan out-chan)))
@bcbradley That would exhibit the same problem I'm afraid
async/pipe
would only pick up the next value from buf-chan
after it has succeeded in putting the previous one
yeah, anything that takes instant action when the buffer size changes is going to be hard to implement
i really don't want to dig around in the murky depths of core async's implementation
um...your function isn't bad, you just need a small tweak:
The problem is that you're blocking in the second part of your alt.
this is going to get a tad ugly, but here's the gist: If your buffer is not full, then you need to accept new values from relimit or from in, as well as try to put values inout the ouput channel
if your buffer is full, you still need an alt, but this time only on relimit or the output
How about going with the atom after all but instead of directly swapping it for decreasing the allowance, also go through relimit-chan
none of this really requires a spin.
What might help you is to write this out as a flow graph on some paper, your existing code is missing an alt! where you currently have >! buff-chan val
That serializes the updates to the atom so you get rid of those nasty concurrency issues at least
but you don't need an atom at all
@tbaldridge What to do if you can't put to out-chan
without blocking, though? Dropping doesn't seem to be an option
what would happen if you received a package in the relimit-chan setting the atom to 0 in the first go-loop at the same time that you dereference the atom in the second go-loop?
you use an alt! to put on the out-chan
@tbaldridge I mean what else do you put in the alt!?
you alt between out-chan and the relimit-chan
Ah right, of course
And add antoher loop around that
if you get a new value in the relmit-chan, modify your local value, if the buffer size is still too small, alt again on the out+relmit chans. Else alt on all three
right
Yeah, that's nicer than fudging around with the atom 🙂
you might even be able to do something elegant with alts! which takes a collection of channels
Like (if (buffer-has-room? ..) (conj chans in-chan) chans))
Oh yeah, you could make the collection of channels part of the loop state
Pretty neat
Take that, golang
yeah, take a look at alts!
it's pretty simple (alt! is just alts! + a case statement)
@bcbradley The basic idea is to include relimit-chan
in every potentially blocking operation via alt!
(or alts!
, respectively) so that you will always be able to service it
and then think of your problem this way: you want to always read form one channel (relimit), sometimes send to one channel (if you have a value to send to out
) and sometimes read from one channel in
if you have room in the buffer.
You're welcome, that was a fun problem 🙂
i figure if i build the pieces out of reusable stuff i could make wrappers for other apis
@bcbradley For caching, be sure to check out https://github.com/clojure/core.cache
the way I have it set up is my functions immediately return a channel that will eventually contain the result of the request
a cache basically be a function in the call stack that doesn't invoke the functions which access the remote service if it has the value for the request cached
but in order to grow the cache, the result of the request that is eventually pushed to the return value channel needs to find its way into the cache, however that is upstream on the call stack
thus the cache-function needs to invoke the service-function with a "control-chan" on each request
it looks something like
;; control-chan is in-band, found as a key/value in the maps sent out over out-chans
(defn- cache [in-chan out-chan]
(let [store (atom {})]
(async/go-loop [data (async/<! in-chan)]
(if-let [remembered (find @store (select-keys data [:method :path :query :body]))]
(async/>! (:return-chan data) (val remembered))
(let [control (async/chan 1)]
(async/>! out-chan (assoc data :cache-control control))
(async/go (let [[k v] (async/<! control)] (swap! store assoc (select-keys k [:method :path :query :body]) v)))))
(recur (async/<! in-chan)))))
The library just gives you various cache strategies implemented as pure functions, so you should be able to plug those into your setup in the right spot.
Right, should be able to just plug them into your swap!
there
Otherwise you'll have to roll your own cache eviction strategy and stuff
Oh so it will just keep everything it ever saw forever?
Then a bare atom is fine, of course