Fork me on GitHub

So I have an event bus, and I want to selectively push events from this bus to clients over websockets according to their ‘subscriptions’ (each client subscribes to and is authorized to view specific messages). How should I design this using core.async?

Ben Sless14:06:22

Why not use async/pub and async/sub and set up "draining" go loops which will consume from the subscription and push to the websocket?

Ben Sless14:06:36

I'm not sure it will handle backpresure right

Ben Sless14:06:38

but you can try


do not use go blocks for IO, use async/thread


IO can starve the limited core.async thread pool, which impacts all go blocks

Ben Sless14:06:39

you're right! my oversight but what would you do about an unbounded number of consumers?


you can use the channels created by pub and sub inside thread, and core.async channels impose backpressure unless you actively try to subvert it

Ben Sless14:06:45

To clarify - if you start an infinite loop (until the socket is closed) in a thread for every user, you'll be allocating an unbounded number of threads. I'm not saying you can't use the channels, I'm wondering how to handle this unbounded thread creation


you can use alts!! on a collection of channels in a single thread


you can isolate the io into a thread, and move other coordination code into go blocks


yeah that sounds good


I don’t think I want pub/sub because each nearly websocket connection is going to want different events


I must say, the core.async limitations about IO and CPU heavy processes inside go blocks seem to constantly violate the principle of least surprise, it seems it's an issue with more questions on this channel than all other issues combined...


oh! sounds like you want a router!


…and also I’m thinking I’d like to handle authorization on the ‘pub’ side


a router does sound like something I might want 🙂


but I don’t see anything about routers in the core.async api?


I wonder how closely "router with auth middleware and a peer to peer chat between clients" maps to this LOL - classic async 101 example


clojure has routers that are data-in / data-out


but I'm talking about the abstraction here, not necessarily a drop in implementation (because of course you don't want an HTTP router and that's what most people mean when they say router...)


yeah, essentially I’d like something like an ‘inverted’ router, that reads events from a single source and ‘fans out’ to deliver them to the subscription websockets


one idea I had was to have an atom containing the websocket ‘routes’/subscriptions, and a ‘router’ thread that reads messages and forwards them according to what’s in the atom


you don't even have to invert anything if you abstract the incoming message and outgoing sends to function calls - it just happens that your input io layer is an event bus and your "handlers" are channel writes


I guess the difference is that the result of the channel write doesn't need to go back to the event bus


so I’m thinking I could have a simple per-websocket loop in a future or similar, that receives messages from the router over a dedicated channel


or - a go-loop owning a set of clients it manages via recur arg, and it reads the channel that provides incoming event-bus events


the only thing that needs a real thread is the IO, and that can be abstracted out


hmm yeah I guess


how would I add/remove clients with that setup?


starting/stopping a websocket would happen independently of the event loop


a channel could provide sub / unsub operations to the router loop


or perhaps more elegantly, you only need a channel for sub, and closing a channel (which should already be detected and handled) can signal its removal


you mean signal over the same channel as for subs?


also, what are the advantages of this approach over keeping an atom?


one advantage of the atom approach is that it’ll make it trivial for me to inspect the ‘subscription state’, e.g. from a repl


the advantage of the loop param option is that it doesn't tie a block of code to a single global resource, which has implications for testing and refactoring


ah okay got it, thanks


also, you end up with coordination questions between the atom and the go-loop - eg. removing a chan from the atom while the go-loop is mid channel operation - not a likely problem, but one you need to keep in mind


if the advantage is repl visibility, you can use reset! inside the loop to update an atom, and still do all the internal ops in a way that leverages the advantages of core.async coordination


without mixing in atom swap! semantics


all that said, you know your app better than I do, sometimes the benefit of using an atom is simpler code, and you can prove the gotchas I mentioned won't matter


yeah, I think I’ll have to try both out and see what feels best 🙂


I think I have a pretty good idea of how to make this happen now, so thanks


one other small thing: a bare bones router - have a "dispatch" map from key to targets {dispatch-x #{chan0 chan2} dispatch-y #{chan1 chan2}} then (let [subscribers (get dispatch (key-fn message))] ...)

👍 3