Fork me on GitHub
#core-async
<
2017-02-06
>
hiredman03:02:39

exception catching in go blocks is pretty broken http://dev.clojure.org/jira/browse/ASYNC-169

hiredman03:02:18

Thread$UncaughtExceptionHandler is also not very reliable, exception catching can happen anywhere, then your handler never gets called

hiredman03:02:36

in fact I think the task that runs go blocks has a try/catch so you'll never get anything reported there

hiredman03:02:42

(that all assumes using go blocks)

hiredman03:02:40

it has been a while since I wrote the patch on that issue, so the details are not fresh in my mind, but you could very easily be getting exceptions that are not being logged for whatever reason

hiredman03:02:59

the fact that core.async releases have been cut with exception handling in the go macro being so broken is rather frustrating. There is actually a patch on another similar issue that predates mine, which no longer cleanly applied, and I didn't notice until I had written a patch to fix it (and a number of related exception handling issues)

dergutemoritz08:02:51

@hiredman Oh right, totally forgot about this. I even had the pleasure to run into this myself before (http://dev.clojure.org/jira/browse/ASYNC-168). I still have that bookmarked to finally try your patch. Any clue why the issue has been idle for so long now?

ska08:02:53

@hiredman that sounds pretty bad. I thought, I had all the cases covered, where an exception might pop up.

bcbradley14:02:37

I'm fooling around with core-async and am trying to implement a throttle chan

bcbradley14:02:55

Do you guys think there is anything wrong with this?

(defn throttle-chan [in-chan out-chan relimit-chan]
  (let [allowance (atom 0)]
    (async/go-loop [f (async/<! relimit-chan)]
      (swap! allowance f)
      (recur (async/<! relimit-chan)))
    (async/go-loop [data (async/<! in-chan)
                    buff (async/chan (async/sliding-buffer 1))]
      (swap!
        allowance
        (fn [old]
          (if (< 0 old)
            (do (async/go (async/>! buff data))
              (- old 1))
            old)))
      (async/>! out-chan (async/<! buff))
      (recur
        (async/<! in-chan)
        (async/chan (async/sliding-buffer 1))))))

bcbradley14:02:18

I get around the fact that atoms may invoke their swap function more than once if the resource is contested by putting a sliding buffer before the outchan

bcbradley14:02:49

I haven't really tested this though, I'm just brainstorming

bcbradley14:02:45

hrm actually nevermind i see a problem

bcbradley14:02:57

if allowance is zero or less the swap will still complete but go-loop will wait forever for buff to receive data

dergutemoritz15:02:05

@bcbradley Can you explain what exactly your function is supposed to do? I associate throttling with something like limiting the message frequency of a channel but I don't see a timing element in your code so you must be thinking of something else.

bcbradley15:02:16

@dergutemoritz it is supposed to pipe things coming from the in-chan to the out-chan while it has an allowance to do so

bcbradley15:02:44

you can increase the allowance at whatever rate you want by supplying something like (partial + 1) to relimit-chan

bcbradley15:02:05

or you could set it to zero with (constantly 0)

bcbradley15:02:26

it has a bug in it though, as I explained

dergutemoritz15:02:54

Alright, thanks for clarifying!

bcbradley15:02:56

I'm trying to figure out the best way to do this without spinlocking

donaldball15:02:06

also your atom swap fn has side effects

bcbradley15:02:14

yeah thats what i was talking about earlier

donaldball15:02:16

atom swap fns can be called an arbitrary number of times

bcbradley15:02:23

the side effects are irrelevant if they are idempotent

dergutemoritz15:02:27

I'll think about it a bit and get back to you if I come up with a good idea 🙂

bcbradley15:02:19

I think i might have fixed it i don't know

bcbradley15:02:29

(defn throttle-chan [in-chan out-chan relimit-chan]
  (let [allowance (atom 0)]
    (async/go-loop [f (async/<! relimit-chan)]
      (swap! allowance f)
      (recur (async/<! relimit-chan)))
    (add-watch allowance :allower
      (fn [_ _ _ new]
        (when (< 0 new)
          (async/go
            (let [data (async/<! in-chan)
                  buff (async/chan (async/sliding-buffer 1))]
              (swap! allowance
                (fn [old]
                  (async/>! buff data)
                  (max 0 (dec old))))
              (async/>! out-chan (async/<! buff)))))))))

tbaldridge16:02:30

watches can have problems as well

tbaldridge16:02:50

Watches are run on the thread that calls swap! so they can end up running in parallel

tbaldridge16:02:24

Is this basically a variable buffer size?

dergutemoritz16:02:46

Oh yeah, even more reason to avoid them if possible 🙂

bcbradley16:02:09

yeah i guess you can think of it like that

bcbradley16:02:21

the buffer can shrink to 0 though

dergutemoritz16:02:22

@tbaldridge Do you have any info on the exception situation that came up earlier?

bcbradley16:02:26

which is how the throttling works

bcbradley16:02:02

i don't understand the syntax on line 4 with the vector as the first element of the list

dergutemoritz16:02:03

@bcbradley My version has the slight disadvantage that putting to relimit-chan could block if the loop is currently blocked by putting to out-chan but that might not be an issue in practice.

dergutemoritz16:02:30

@bcbradley That's the syntax of async/alt!, look it up in its doc string

tbaldridge16:02:28

The spinning loop problem with core.async is a thing, and we have a patch for it, so I'm not sure where it needs to go from there.

tbaldridge16:02:23

@dergutemoritz so here's another option...there's a internal protocol for buffers that's fairly simple and would make this code rather clean. However a) it's not a public specification so core.async may change the interface in the future

tbaldridge16:02:40

On the other hand it hasn't changed in like 3-4 years, so it probably won't change

bcbradley16:02:06

its a shame implementing a floating throttle is so difficult

bcbradley16:02:17

every solution i've seen has edge cases

tbaldridge16:02:58

It's pretty trivial to write a custom buffer that calculates full? based on the value in an atom - current buffer size.

bcbradley16:02:34

huh that sounds pretty elegant

bcbradley16:02:47

your are right its a pity the protocol isn't public

tbaldridge16:02:30

but....like that protocol was last changed 1.5 years ago

tbaldridge16:02:57

So it's up to you if you want to risk it or not.

bcbradley16:02:48

what do you think personally?

bcbradley16:02:09

if i made a throttle-buffer using that protocol do you think it might break in the future?

dergutemoritz16:02:27

@tbaldridge Thanks, upvoted 🙂

tbaldridge16:02:29

@bcbradley I'd put it in a function that's fairly abstract. And if the protocol ever changes, I'd rewrite the function

tbaldridge16:02:04

by abstract I mean that all the details of how you implement this throttle shouldn't escape from that function.

bcbradley16:02:37

i was thinking something like pipe, taking an in and and out chan

bcbradley16:02:43

but also taking a control chan

bcbradley16:02:45

dergutemoritz approach wasn't bad either

bcbradley16:02:04

the biggest edge case is that it blocks when the out-chan blocks

bcbradley16:02:22

but i think i can work around that like this:

(defn- throttle-chan [in-chan out-chan relimit-chan]
  (let [buff-chan (async/chan)]
    (async/go-loop [allowance 0]
      (async/alt!
        relimit-chan
        ([f] (recur (f allowance)))
        in-chan
        ([val] (if (< 0 allowance)
                 (do (async/>! buff-chan val)
                   (recur (dec allowance)))
                 (recur allowance)))))
    (async/pipe buff-chan out-chan)))

dergutemoritz16:02:08

@bcbradley That would exhibit the same problem I'm afraid

dergutemoritz16:02:42

async/pipe would only pick up the next value from buf-chan after it has succeeded in putting the previous one

bcbradley16:02:52

oh yeah thats right

bcbradley16:02:03

ugh this is hard

tbaldridge16:02:40

yeah, anything that takes instant action when the buffer size changes is going to be hard to implement

bcbradley16:02:23

i really don't want to dig around in the murky depths of core async's implementation

bcbradley16:02:36

i'm not an expert with the jvm or java interop

tbaldridge16:02:51

um...your function isn't bad, you just need a small tweak:

tbaldridge16:02:28

The problem is that you're blocking in the second part of your alt.

bcbradley16:02:54

you saying something like poll?

bcbradley16:02:59

well, the opposite

bcbradley16:02:04

what is it? put?

tbaldridge16:02:23

this is going to get a tad ugly, but here's the gist: If your buffer is not full, then you need to accept new values from relimit or from in, as well as try to put values inout the ouput channel

tbaldridge16:02:40

if your buffer is full, you still need an alt, but this time only on relimit or the output

dergutemoritz16:02:40

How about going with the atom after all but instead of directly swapping it for decreasing the allowance, also go through relimit-chan

bcbradley16:02:23

idk would that require a spin?

tbaldridge16:02:36

none of this really requires a spin.

tbaldridge16:02:26

What might help you is to write this out as a flow graph on some paper, your existing code is missing an alt! where you currently have >! buff-chan val

dergutemoritz16:02:32

That serializes the updates to the atom so you get rid of those nasty concurrency issues at least

tbaldridge16:02:44

but you don't need an atom at all

dergutemoritz16:02:46

@tbaldridge What to do if you can't put to out-chan without blocking, though? Dropping doesn't seem to be an option

bcbradley16:02:18

what would happen if you received a package in the relimit-chan setting the atom to 0 in the first go-loop at the same time that you dereference the atom in the second go-loop?

tbaldridge16:02:35

you use an alt! to put on the out-chan

dergutemoritz16:02:54

@tbaldridge I mean what else do you put in the alt!?

tbaldridge16:02:59

you alt between out-chan and the relimit-chan

dergutemoritz16:02:23

Ah right, of course

dergutemoritz16:02:28

And add antoher loop around that

tbaldridge16:02:29

if you get a new value in the relmit-chan, modify your local value, if the buffer size is still too small, alt again on the out+relmit chans. Else alt on all three

dergutemoritz16:02:54

Yeah, that's nicer than fudging around with the atom 🙂

tbaldridge16:02:14

you might even be able to do something elegant with alts! which takes a collection of channels

tbaldridge16:02:44

Like (if (buffer-has-room? ..) (conj chans in-chan) chans))

dergutemoritz16:02:52

Oh yeah, you could make the collection of channels part of the loop state

dergutemoritz16:02:07

Take that, golang

bcbradley16:02:08

i'm lost lol

bcbradley16:02:35

sounds like i need to study up on alts though

tbaldridge16:02:00

yeah, take a look at alts! it's pretty simple (alt! is just alts! + a case statement)

dergutemoritz16:02:20

@bcbradley The basic idea is to include relimit-chan in every potentially blocking operation via alt! (or alts!, respectively) so that you will always be able to service it

bcbradley16:02:47

ok that is far better clarity

bcbradley16:02:53

thats actually what is going on

tbaldridge17:02:19

and then think of your problem this way: you want to always read form one channel (relimit), sometimes send to one channel (if you have a value to send to out) and sometimes read from one channel in if you have room in the buffer.

bcbradley17:02:20

relimit-chan has to be serviced regardless of the conditions of the other chans

bcbradley17:02:19

thanks for the help guys

bcbradley17:02:23

i appreciate it

dergutemoritz17:02:12

You're welcome, that was a fun problem 🙂

bcbradley17:02:40

i'm just trying to build up an api for discord's chat / voice service

bcbradley17:02:58

i figure if i build the pieces out of reusable stuff i could make wrappers for other apis

bcbradley17:02:22

throttling is a big part of most HTTP services

bcbradley17:02:35

i also need to work on a general cache

bcbradley17:02:45

i think atoms are a good fit for that

bcbradley17:02:25

actually i'm not sure if those would work

bcbradley17:02:13

the way I have it set up is my functions immediately return a channel that will eventually contain the result of the request

bcbradley17:02:48

a cache basically be a function in the call stack that doesn't invoke the functions which access the remote service if it has the value for the request cached

bcbradley17:02:31

but in order to grow the cache, the result of the request that is eventually pushed to the return value channel needs to find its way into the cache, however that is upstream on the call stack

bcbradley17:02:21

thus the cache-function needs to invoke the service-function with a "control-chan" on each request

bcbradley17:02:29

atleast thats how i've got it set up right now

bcbradley17:02:08

it looks something like

;; control-chan is in-band, found as a key/value in the maps sent out over out-chans
(defn- cache [in-chan out-chan]
  (let [store (atom {})]
    (async/go-loop [data (async/<! in-chan)]
      (if-let [remembered (find @store (select-keys data [:method :path :query :body]))]
        (async/>! (:return-chan data) (val remembered))
        (let [control (async/chan 1)]
          (async/>! out-chan (assoc data :cache-control control))
          (async/go (let [[k v] (async/<! control)] (swap! store assoc (select-keys k [:method :path :query :body]) v)))))
      (recur (async/<! in-chan)))))

dergutemoritz17:02:15

The library just gives you various cache strategies implemented as pure functions, so you should be able to plug those into your setup in the right spot.

bcbradley17:02:41

i'll give it a second look

dergutemoritz17:02:43

Right, should be able to just plug them into your swap! there

dergutemoritz17:02:13

Otherwise you'll have to roll your own cache eviction strategy and stuff

bcbradley17:02:38

there isn't anything to evict

bcbradley17:02:05

i mean i guess you could assoc over a key thats already in there

dergutemoritz17:02:21

Oh so it will just keep everything it ever saw forever?

dergutemoritz17:02:33

Then a bare atom is fine, of course

bcbradley17:02:42

you don't have to send anything over the cache-control chan

bcbradley17:02:47

its perfectly fine not to

bcbradley17:02:03

if you do, it will get cached for future requests

bcbradley17:02:55

i guess that makes a lot of garbage though

bcbradley17:02:15

a bunch of cache-chans are made that live for a little while and are never used, then get garbage collected