Fork me on GitHub
#core-async
<
2019-07-30
>
rickmoynihan10:07:54

With core.async whats the recommended way to raise an error when a channel/buffer is full / blocked?

rickmoynihan10:07:19

I’d like to exert back pressure up stream when my chan’s buffer is full by issuing a HTTP 503

rickmoynihan10:07:29

ahh I can test full? on a buffer

rickmoynihan10:07:51

though that doesn’t seem to solve it as another thread may put on the channel between when I check and add to the chan. I really need to raise an error on put rather than block.

rickmoynihan10:07:38

ok offer! looks like what I want

Jan K12:07:18

put! itself will eventually start throwing exceptions when the internal channel buffer fills up (1024 puts), that's in addition to the explicit channel buffer you can provide

rickmoynihan16:07:25

One thing I’d like to do is dedupe duplicate requests on a core.async channel. I initially wrote (chan 10 (distinct)), which works but will eventually OOM the server as the volatile inside the distinct transducer isn’t bounded. I was thinking I could do this by creating my own DistinctBuffer deftype that implements the Buffer protocol. Does that sound like a good idea?

hiredman16:07:07

I there are broadly 3 places you could implement that: 1. a custom buffer 2. a custom transducer 3. a custom version of async/pipe (copies between 2 channels)

hiredman16:07:31

the last is maybe the quickest to prototype and try out

rickmoynihan16:07:28

I did think about 2 and 3 also, but didn’t mention the other requirement; which was to enforce a fixed buffer size on the set.

rickmoynihan16:07:51

so I think a custom buffer might be best

hiredman16:07:00

ah, you want to use the buffer as the set

rickmoynihan16:07:05

yeah — essentially use a set as the buffer, with a max size

rickmoynihan17:07:25

ok this should be pretty easy actually

rickmoynihan17:07:38

but need to go home now 😞 so it’s gonna have to wait

alexmiller17:07:23

did you try the dedupe transducer?

alexmiller17:07:01

that will remove consecutive dupes (but not overall dupes)

rickmoynihan18:07:50

yeah I did but I’d like to remove any dupes in the buffer

noisesmith18:07:20

I wonder if some lib has implemented cache-chan yet

noisesmith18:07:46

with a clojure.core.cache cache, and a keying function, you could get TTL / FIFO / LIFO etc.

noisesmith18:07:37

this lib looks like it tries that idea, can't vouch for how well it does so https://github.com/benashford/memoasync

Joe Lane18:07:28

Would it be simpler to make that part of a stateful transducer instead?

noisesmith18:07:46

I think that's a better approach than the above lib, yeah

hiredman18:07:11

what does a "cache-chan" do? memoization works for functions becuase they map an input to an output, but a channel is input in and input out, so it is not clear to me what memoizing that means

noisesmith18:07:26

that's why I suggested a keying function (I guess one would want a transform function as well)

noisesmith18:07:36

I guess it's a half baked idea really

alexmiller19:07:38

promise-chan is a one-slot cache :)

noisesmith19:07:48

for the original question, not passing along dups, the message itself would be the cache key

noisesmith19:07:03

and then the cache rules would constrain the size of that dedupe cache

hiredman19:07:12

and promise-chan works great when memoizing

alexmiller19:07:15

the problem with wanting to filter all dupes is that you need arbitrary sized memory to remember what you've seen

rickmoynihan19:07:00

not all dupes ever, just all dupes in a bounded buffer

hiredman19:07:26

the memoasync project likely predates promise-chan

rickmoynihan19:07:44

yeah it predates transducers too