Microservice with Ktor and Kafka

Haha don’t take my random samples as actual real code. :stuck_out_tongue: When I put // Kafka code goes here, I just meant “this is the place where you would register your Kafka code if you want it to run on the Netty event loop”. I’m certainly not advocating for creating an anonymous object and putting all of your Kafka connection code in the creation of that anonymous object. I would expect you to use standard practices, of creating a class for your Kafka code, and supplying the Kafka connection lazily.

As for the Netty loop being blocking and not suspending, I think that’s what @frevib wanted? A single event loop that is utilised by both Netty and KTOR, to reduce the number of threads and increase performance. As you point out, that may not work, as KTOR is using suspending functions. It also may not be pretty, but hey, be careful what you ask for. :stuck_out_tongue:

The NettyApplicationEngine does indeed convert the event loop into a coroutine-friendly one; see here. It also creates a userContext which may be for handling and queuing up incoming requests, not sure.

Ohh, I actually made a silly mistake in my Ktor code above. I missed the coroutineScope, so I accidentally used the coroutine context of the application :facepalm: So I was wrong, dispatcher is correctly set for request handlers and we stay in the Netty thread pool:

get("/") {
    coroutineScope {
        println("1: ${Thread.currentThread()}") // Thread[eventLoopGroupProxy-4-1,5,main]
        println("2: ${coroutineContext[ContinuationInterceptor]}") // NettyDispatcher@3299ded9
        launch {
            println("3: ${Thread.currentThread()}") // Thread[eventLoopGroupProxy-4-1,5,main]
        }
    }

    call.respondText("Hello World!")
}
1 Like

I’m not sure what you mean here, to be honest. The point is: by going into the Netty and its thread pools, we can’t run coroutines directly. We still can implement a Kafka consumer, but the existing code using coroutines, flows, etc. would have to be rewritten to event-based code.

Anyway, to sum up everything:

  1. Internally, Ktor run coroutines on top of the Netty thread pool by using NettyDispatcher.
  2. I didn’t find a clean and documented way to acquire this dispatcher. Unfortunately, events like ApplicationStarted are executed in another context.
  3. We can get it by hacking, at least in 2 ways:
  • Capture it in the request handler.
  • Use reflection to get private field: engine.userContext.
  1. We can also try to acquire the Netty thread pool and then turn it into a coroutine dispatcher. Depending on how we get it and what’s its type exactly, we can use a code similar to NettyDispatcher or if we can get it as an Executor then we can even simply do asCoroutineDispatcher().

I think what frevib was asking for originally was to hook into the existing event loop in order to prevent running multiple threads.

This is my loose understanding of how an event loop works:

while (true) {
    val newRequest = checkForNewRequest()

    if (newRequest != null) {
        requestHandlers.filter { it.canHandleRequest(newRequest) }
            .firstOrNull()
            ?.let { it.handleRequest(newRequest) }
}

So what I figured frevib was asking for, was a way to add his own code into that loop, instead of creating another thread that also does while (true). So ideally, you’d change the above code to this:

while (true) {
    val newRequest = checkForNewRequest()

    if (newRequest != null) {
        requestHandlers.filter { it.canHandleRequest(newRequest) }
            .firstOrNull()
            ?.let { it.handleRequest(newRequest) }

    val newMessages = kafkaConsumer.poll()

    if (newMessages.isNotEmpty()) newMessages.forEach { handleMessage(it) }
}

Obviously, you wouldn’t actually be adding your code directly into the loop. But maybe if the existing loop is something like this:

while (true) {
    val newRequest = checkForNewRequest()

    if (newRequest != null) {
        requestHandlers.filter { it.canHandleRequest(newRequest) }
            .firstOrNull()
            ?.let { it.handleRequest(newRequest) }

    otherHandlers.forEach { it.checkForNewEvents() }
}

Then you could add your Kafka Consumer code into the list of otherHandlers, so that it gets run inside the same while (true) loop that all the Netty code runs in.

Yes, we can run our Kafka consumer using the thread pool of Netty, We just can’t use the existing code of the consumer directly, because this thread pool is not capable to run coroutines. And we can’t simply do runBlocking, because this is disallowed for event loops. Consumer would have to be rewritten from scratch from coroutines/flows to classic event-based code. Or (better) we can bridge coroutines and Netty thread pool by creating a dispatcher as Ktor does. This could be pretty straightforward, but may be also a little harder.

Correct, but then also using coroutines.

This is my loose understanding of how an event loop works:

Correct-ish. Simple explanation: How does non-blocking IO work under the hood? | by Hielke de Vries | ING Blog | Medium

The event loop is the engine where the coroutine Jobs and Ktor HTTP requests are completed on. Coroutine syntax is just the nice way of expressing async code. Under the hood, the event loop (or anything that can complete jobs, like a thread pool) takes care of executing the Jobs.

So we have two separate concepts:

  • event loop (engine)
  • coroutines (syntax)

Ktor uses a Netty event loop, Coroutines (by default) something else, some kind of Coroutines event loop. Now ideally we want to have only one event loop, but also use Coroutines. As I can see now, we have a solution that will use the Netty event loop, but have no Coroutines syntax. We likely have the performance gain of using only one event loop (Netty), but missing out on the nice coroutines syntax like @broot mentioned.

You can do as easily as:

EventLoopGroupProxy.create(1).asCoroutineDispatcher()

:grinning:

1 Like

I think you could do this:

Create your own event loop and provide it to Netty via the configuration lambda, then create a Dispatcher from that event loop and use it to run your Kafka Consumer with suspended code. I think that’d be most of what you want? The fact that there’s multiple dispatchers created from the same thread pool might be an issue… but maybe with a bit more tooling you can get what you want. :slight_smile:

Cheers I’ll give it a read, always keen to expand my understanding on topics like this.

1 Like

It’s a start, but I guess it works :slight_smile:

fun main() {

    val nettyEventLoopGroup = EventLoopGroupProxy.create(1)

    embeddedServer(Netty, module = Application::modules, configure = {
        this.configureBootstrap = {
            group(nettyEventLoopGroup)
        }
    }).start(wait = false)

    runBlocking(nettyEventLoopGroup.asCoroutineDispatcher()) {
        launch {
            println("thread::: ${Thread.currentThread().name}") // thread::: eventLoopGroupProxy-1-1
        }
    }

}

Yes, I think this should be fine. Please verify if by using configureBootstrap you didn’t entirely replace the default config. If this is the case, then it probably makes sense to duplicate the code they use in Ktor.

1 Like

Should it be a runBlocking, or a CoroutineScope(nettyEventLoopGroup.asCoroutineDispatcher()).launch { ... }? I guess if you’re not waiting for the Netty server, then the runBlocking stops your main function from just exiting.

To elaborate on my previous comment: again, I’m not familiar with Netty/Ktor internals, but it seems by default they use separate thread pools for handling connections and for handling requests. And they use more than CPU cores for this: ktor/ktor-server/ktor-server-core/common/src/io/ktor/server/engine/ApplicationEngine.kt at 8ad59e3c5416e7f2a7e3f62f7c7958ee32f09179 · ktorio/ktor · GitHub You probably just reconfigured it to use a single thread and share it for both connections and requests. This will most probably impact the performance greatly.

For this reason I personally would probably prefer to grab the NettyDispatcher through reflection. More ugly, but this way we don’t have to reconfigure the server and risk breaking the performance.

Indeed, there are two event loop (groups). I’ll do some performance testing and see where where we’re at. I agree you don’t want too much manual configuration on Netty or it gets complex very fast.

Put up some benchmarking code, for your interest: GitHub - frevib/coroutine-jmh-benchmarks-virtual-threads