Fork me on GitHub
#core-async
<
2020-06-18
>
wombawomba10:06:00

So I have an event bus, and I want to selectively push events from this bus to clients over websockets according to their ‘subscriptions’ (each client subscribes to and is authorized to view specific messages). How should I design this using core.async?

Ben Sless14:06:22

Why not use async/pub and async/sub and set up "draining" go loops which will consume from the subscription and push to the websocket?

Ben Sless14:06:36

I'm not sure it will handle backpresure right

Ben Sless14:06:38

but you can try

noisesmith14:06:45

do not use go blocks for IO, use async/thread

noisesmith14:06:12

IO can starve the limited core.async thread pool, which impacts all go blocks

Ben Sless14:06:39

you're right! my oversight but what would you do about an unbounded number of consumers?

noisesmith14:06:52

you can use the channels created by pub and sub inside thread, and core.async channels impose backpressure unless you actively try to subvert it

Ben Sless14:06:45

To clarify - if you start an infinite loop (until the socket is closed) in a thread for every user, you'll be allocating an unbounded number of threads. I'm not saying you can't use the channels, I'm wondering how to handle this unbounded thread creation

noisesmith14:06:27

you can use alts!! on a collection of channels in a single thread

noisesmith14:06:50

you can isolate the io into a thread, and move other coordination code into go blocks

wombawomba14:06:59

yeah that sounds good

wombawomba14:06:22

I don’t think I want pub/sub because each nearly websocket connection is going to want different events

noisesmith14:06:50

I must say, the core.async limitations about IO and CPU heavy processes inside go blocks seem to constantly violate the principle of least surprise, it seems it's an issue with more questions on this channel than all other issues combined...

noisesmith14:06:50

oh! sounds like you want a router!

wombawomba14:06:01

…and also I’m thinking I’d like to handle authorization on the ‘pub’ side

wombawomba14:06:55

a router does sound like something I might want 🙂

wombawomba15:06:17

but I don’t see anything about routers in the core.async api?

noisesmith15:06:46

I wonder how closely "router with auth middleware and a peer to peer chat between clients" maps to this LOL - classic async 101 example

noisesmith15:06:59

clojure has routers that are data-in / data-out

noisesmith15:06:51

but I'm talking about the abstraction here, not necessarily a drop in implementation (because of course you don't want an HTTP router and that's what most people mean when they say router...)

wombawomba15:06:50

yeah, essentially I’d like something like an ‘inverted’ router, that reads events from a single source and ‘fans out’ to deliver them to the subscription websockets

wombawomba15:06:28

one idea I had was to have an atom containing the websocket ‘routes’/subscriptions, and a ‘router’ thread that reads messages and forwards them according to what’s in the atom

noisesmith15:06:30

you don't even have to invert anything if you abstract the incoming message and outgoing sends to function calls - it just happens that your input io layer is an event bus and your "handlers" are channel writes

noisesmith15:06:50

I guess the difference is that the result of the channel write doesn't need to go back to the event bus

wombawomba15:06:38

so I’m thinking I could have a simple per-websocket loop in a future or similar, that receives messages from the router over a dedicated channel

noisesmith15:06:56

or - a go-loop owning a set of clients it manages via recur arg, and it reads the channel that provides incoming event-bus events

noisesmith15:06:17

the only thing that needs a real thread is the IO, and that can be abstracted out

wombawomba15:06:58

hmm yeah I guess

wombawomba15:06:17

how would I add/remove clients with that setup?

wombawomba15:06:50

starting/stopping a websocket would happen independently of the event loop

noisesmith15:06:04

a channel could provide sub / unsub operations to the router loop

noisesmith15:06:39

or perhaps more elegantly, you only need a channel for sub, and closing a channel (which should already be detected and handled) can signal its removal

wombawomba15:06:24

you mean signal over the same channel as for subs?

wombawomba15:06:43

also, what are the advantages of this approach over keeping an atom?

wombawomba15:06:29

one advantage of the atom approach is that it’ll make it trivial for me to inspect the ‘subscription state’, e.g. from a repl

noisesmith15:06:27

the advantage of the loop param option is that it doesn't tie a block of code to a single global resource, which has implications for testing and refactoring

wombawomba15:06:01

ah okay got it, thanks

noisesmith15:06:07

also, you end up with coordination questions between the atom and the go-loop - eg. removing a chan from the atom while the go-loop is mid channel operation - not a likely problem, but one you need to keep in mind

noisesmith15:06:27

if the advantage is repl visibility, you can use reset! inside the loop to update an atom, and still do all the internal ops in a way that leverages the advantages of core.async coordination

noisesmith15:06:43

without mixing in atom swap! semantics

noisesmith15:06:37

all that said, you know your app better than I do, sometimes the benefit of using an atom is simpler code, and you can prove the gotchas I mentioned won't matter

wombawomba15:06:07

yeah, I think I’ll have to try both out and see what feels best 🙂

wombawomba15:06:58

I think I have a pretty good idea of how to make this happen now, so thanks

noisesmith15:06:37

one other small thing: a bare bones router - have a "dispatch" map from key to targets {dispatch-x #{chan0 chan2} dispatch-y #{chan1 chan2}} then (let [subscribers (get dispatch (key-fn message))] ...)

👍 3