Fork me on GitHub
#core-async
<
2021-03-31
>
danm08:03:53

I'm trying to make our APIs interactions with Kafka (the consumer for which is synchronous and blocking) work more nicely with our otherwise async pattern, and I'm hoping for some feedback of "Oh God that sounds horrible, do X instead", or "that should work OK" 😉 I've not used core.async much before. Currently, we have a ring API handler that we want to be async, but on the backend it's writing a message to Kafka and spawning a (synchronous, blocking) consumer to look for the response, per request. That's very quickly exhausting the Jetty server threadpool when we have a lot of requests. My idea is for each request to instead create a channel with a timeout, then park waiting for a response it can take on that channel. There is then a single consumer in a separate thread, which is constantly polling for messages from Kafka, checking to see if there is a channel associated with them, and putting the message onto that channel if so (do I even need a go block here, or can I just use put!? The main issue I can see is allowing the consumer thread to know which responses are relevant (easy, we can filter on a key to only messages which are in response to messages this instance sent out), and get the channel for the request. The immediate way I can see to do that is to have an atom which is a map of :request-id channel. A request assocs it's channel to the atom before calling go/sending to Kafka, and removes the request from the atom on timeout or successful response. The consumer thread just has to look and see if there is an entry in the atom for the current message's request-id. The atom is a single-point bottleneck, but my gut feeling is that it will be far faster than our request-response cycle even so, so shouldn't be an issue.

hiredman17:03:18

I think as described you are just moving stuff around. The real meat of the issue is a ring request taking occupying a thread until it gets the response back from kafka

hiredman17:03:23

core.async cannot magically make a ring handler asynchronous

hiredman17:03:53

ring does have a spec for asynchronous handlers, but it was added much later and I don't know how universal the support for it is

hiredman17:03:53

all of which is to say, this is not something core.async can solve. If you have a solution core.async might make it nicer to read (get rid of some callbacks)

danm18:03:11

The ring handler is already supposed to be async, that’s the ones we’re using. But that still relies on our code correctly being async all the way down, which with a blocking Kafka client it isn’t. By reducing the number of clients we’re running to 1, in a separate non-handler thread, and parking for a message from that via async in the handler thread, it should release the thread while waiting for a response

rickmoynihan09:04:04

That should be pretty trivial to setup with either a java thread serving a java.util.concurrent.BlockingQueue or core.async.

rickmoynihan09:04:46

core async won’t offer many advantages over using java.util.concurrent here. The main advantage of core.async will be better timeouts… probably implementable by just modifying the example above to contain a go block that alts! on a timeout channel, and returns the appropriate http code if there isn’t a response from kafka, or if it’s blocking too long on the offer/put.

rickmoynihan09:04:56

the example is obviously just faked up, to show the mechanism… I’ve not actually tried it. Here I created two threads serving kafka, as one might be a bottleneck; you’d need to dig further to be sure. For example your client may already pool connections and threads for you, in which case you probably don’t want to pool again, as you’re just using the thread to avoid blocking.

danm13:04:01

The client won’t pool automagically, but for simple consume->filter->drop I’ve done tens of thousands of messages a second with a single consumer, which is way over and above the request rates we’re currently thinking about. If that does become an issue we can get into multiple threads with a subset of partitions each, though I worry the atom would start to be a bottleneck if we get to request rates where that’s necessary

danm13:04:28

We went back and forward a bit on core.async vs just executing the response function directly from the consumer thread, but as you say alt! + timeout chan gives us the ability to time out and respond to the client better

rickmoynihan13:04:00

I could be wrong but I don’t think you need an atom here

danm13:04:19

You’re setting up a single channel for all requests, and I don’t understand how that would work? For each request we need to send a unique message to Kafka and then park waiting on the specific response to that message, or time out. Which means we need a unique channel per request and the consumer thread needs to know which one to send a given message to

danm13:04:59

I don’t get how with a single channel that all responses get dumped on a given request would correctly get its response

rickmoynihan14:04:56

@U6SUWNB9N: I could be missing something; and I haven’t actually tried this code; but I’m assuming the respond and respond-error functions ring gives you will already know which request/socket to associate them with. So I just pass those functions along with the request to the kafka thread and have that thread trigger the callbacks appropriately.

rickmoynihan14:04:26

I don’t think you need to add id’s at all, because you essentially have them already in function closures.

rickmoynihan14:04:34

If you want to do an alts! and wait, then I think you can essentially do so as I described… just fire up a go block (which is just a lightweight thread) and have it do it.

rickmoynihan14:04:17

so long as that can see the respond or respond-error functions it’ll be able to respond

rickmoynihan15:04:23

If you need to handle the timeout on kafka not servicing the queue, then you need to just pass a response channel to the kafka thread, and alt! a timeout on that.

danm15:04:30

But where would I fire up the go block? In the request handler? If I do that, how does the consumer thread know not to respond to a timed-out request? And why would the request not just get the first response put on the channel, rather than the actual response related to it? If there are multiple requests all waiting for a response on the same channel, surely there’s a race condition as to which gets the request and ditches the timeout for itself, when each one will have a timeout at a different point?

danm15:04:57

Yeah, pass a response channel to the Kafka thread. That’s what the atom is for. So it can get the correct response channel for the given request. So I’m storing a map of request: response channel. I don’t get how I could avoid using an atom for that. If I pass the response channel over another channel to the Kafka thread then I need something in there that’s reading those and building a map

danm15:04:20

Do you mean because that will be single-thread and working sequentially it doesn’t need an atom?

danm15:04:47

Although if it will be running alongside the blocking consumer I don’t see how it’s get anywhere. But it could run elsewhere as long as it was the only thing modifying the map?

rickmoynihan00:04:21

So the questions seem to be mainly about the suggestions for how you can incorporate a timeout… > But where would I fire up the go block? In the request handler? Yes, precisely this. Ring async handlers will ignore their return values, and rely on the two callback functions to signal completion. Go blocks are cheap, so you would just spawn it where you need it; as a minimum you need one over essentially anything that is putting or taking from a channel. > If there are multiple requests all waiting for a response on the same channel, surely there’s a race condition as to which gets the request and ditches the timeout for itself, when each one will have a timeout at a different point? No, that’s not what I’m describing. There should be no such race condition or potential for mixing the streams up. I’m describing a shared channel of requests to the kafka thread. Each request message over that channel then contains a one shot response channel (per ring request), which allows you to respond to the proper socket/client in ring. The go block in the ring handler will essentially close over the correct callback and response channel, so there’s no need for an atom or a map from request to response-channel, you essentially have that in your handlers function closure. > If I do that, how does the consumer thread know not to respond to a timed-out request? There is no perfect solution to this; as it’s essentially equivalent to the byzantine generals problem. You could have the client to send a NAK (negative acknowledgement) to essentially inform the server its given up or cancelled its request; and you could devise a protocol where by the server checked for NAKs before checking for messages, or do something similar to a two phase commit, but there’d always be a window of uncertainity e.g. you might have not received a nak and started processing the message commits it, but at the last moment the client times out and gives up; even though the work was done. As there is no solution that covers ever case; I’d just settle for the simple option of just using the same timeout chan across both the sending of the req, and awaiting for the response, and accept there’s always a small chance the client might give up after you’ve done the work and never know you did it.

rickmoynihan00:04:39

There’s a snippet that shows how you can arrange timeouts as described and do the rpc

danm11:04:37

I’m still failing to properly grok that. It looks like in kafka-thread you’re expecting to send a request to Kafka and then block until you get the response to that request, which can then be used for the response fn. So like a traditional HTTP request/response. But a Kafka consumer gets back every message put onto the topic for all requests, not just the responses to it. I might write 100 request messages or more before I get the response to any of them, and the first response back might be to the 50th request sent. I might read 100s, in multiple batches, blocking between each batch, before I get to the one relevant to the request I sent. I could add in a filter so that the only message that gets handled is the correct one. But fundamentally then I’m having to spawn a thread with a consumer in per request, which feels like it doesn’t get me anything over just increasing the max threadpool size for Jetty to some huge value and handling them in a sync loop there (what we’re already doing). The idea I was going for was that a single consumer thread could read every message from the topic and just look up in a map which of the currently in-flight requests it related to (if any, it might be any of them), and then respond to it correctly.

rickmoynihan08:04:53

Ok, sorry I’ve not used kafka. In that case your approach makes a lot more sense, you’ll need a way to map id to channel to assign all items in the batch. > I could add in a filter so that the only message that gets handled is the correct one. But fundamentally then I’m having to spawn a thread with a consumer in per request. Not necessarily, you can have just one real thread on the kafka consumer, and then do the filtering etc in an a/go for each request/response channel. I think with the map of id -> response-chan you’re then essentially rebuilding pub/sub, so you could possibly use core.async’s pub/sub mechanism in place of that. It may or may not be a good fit; depending on how you need the blocking/buffering to work, and how much fine grained control you need over buffered items.

danm08:04:01

Ooh, I’ve not seen any pub/sub stuff like that with it. Will have a dig. Ta!

jjttjj17:03:21

Anyone aware of any attempts to benchmark the latency introduced by core.async, vs things like manifold/atoms/agents and/or in various scenarios (such as buffered vs unbuffered channels), threads vs go-loops, overhead added by an additional go loop (ie a pipe), etc?

jjttjj17:03:37

Is something like this the right approach:

(def runs 10000)
;;'adequately' buffered chan
(defn x1 []
  (time
    (let [a1   (chan runs)
          done (promise)]
      (a/go
        (dotimes [x runs]
          (>! a1 true))
        (a/close! a1))
      (go-loop []
        (if-some [x (<! a1)]
          (recur)
          (deliver done true)))
      @done)))

;;'adequately' buffered chans with a pipe
(defn x2 []
  (time
    (let [a1   (chan runs)
          a2   (chan runs)
          done (promise)]
      (a/pipe a1 a2)
      (a/go
        (dotimes [x runs]
          (>! a1 true))
        (a/close! a1))
      (go-loop []
        (if-some [x (<! a2)]
          (recur)
          (deliver done true)))
      @done)))
(but with a grid of a lot more scenerios)