Microservice with Ktor and Kafka

Hi, what would be the best approach running Ktor together with Kafka? Or running any other long-running process like Kafka together with Ktor? I came up with these three ideas, but I’m not sure which would be “best”, in terms of performance and possibly aesthetics. For instance, if you launch Ktor in a separate Coroutine, do you get 2 event loops?

It would be nice if Ktor would support something like a Kafka consumer out-of-the-box.

  1. Running both Ktor and Kafka consumer in a Coroutine like:
fun main() = runBlocking {
	launch {
		runInterruptible(Dispatchers.IO) {
                io.ktor.server.netty.EngineMain.main(args)
            }
	}

	launch(Dispatchers.IO) {
		kafkaConsumer.startConsume(..)
	}
}
  1. Running Ktor using wait = false:
fun main() = runBlocking {
	val server = embeddedServer(Netty, port = 8080, module = Application::module).start(wait = false)

	launch(Dispatchers.IO) {
		kafkaConsumer.startConsume(..)
	}
}
  1. Running Kafka consumer as a plugin of Ktor:
fun Application.module() {
	install(KafkaKtorPlugin)
}

val KafkaKtorPlugin = createApplicationPlugin(name = "KafkaKtorPlugin") {
	// use on(..) to start Kafka consumer
    on(MonitoringEvent(ApplicationStarted)) { 
         it.setupKafka()
    }
}


fun Application.setupKafka() {
    launch(Dispatchers.IO) {
        Dependencies.kafkaKonsumerOrderA.startConsume { record ->
            println("key: ${record.key()}, value: ${record.value()}")
        }
    }
}

I would go with option 4 (which I think is meant to be option 3?). I’ve done something similar with a KTOR application I’ve built. I basically copied Dropwizard’s bundle approach and separated things out into functions/objects, so my main code looks like this:

private val components = listOf(DatabaseComponent, SecurityComponent)

fun main(args: Array<String>): Unit = EngineMain.main(args)

@Suppress("unused") // application.yml references the main function. This annotation prevents the IDE from marking it as unused.
fun Application.module() {
    configureAppModule(this)
    components.forEach {
        it.configureModule()
        it.registerHealthChecks()
    }
    doMigrations()
    configureAdministration()
    configureSerialization()
    configureMonitoring()
    configureHTTP()
    configureSecurity()
    configureRouting()
}

So you basically put the code that starts the Ktor server and the Kafka consumer behind a function/object? That looks cleaner indeed.

I’m still interested in the performance aspects. Ktor runs on a Netty event loop. What would that mean for the event loop that is created by runBlocking that the Kafka consumer needs? Can the Kafka consumer and Ktor share event loop resources?

(Edited previous post, option 4 is indeed 3)

No idea, you’re getting more technical than I did. :stuck_out_tongue:

I would personally just create thread pools/resources/whatever is required for Kafka or anything else, unless KTOR/Netty provides a way to hook into its event loop and run your own code.

  1. Architectural perspective. If you consider the Kafka consumer to be logically a part of the HTTP server application, you can configure it as one of Ktor modules/plugins. Is there any shared logic between both parts, do they cooperate on completing a single task? Do they share “beans”, do you maybe use some knowledge about HTTP sessions in the Kafka consumer or maybe you have HTTP endpoints that affect how the Kafka consumer operates, etc.? If you don’t have any logic like this, they do entirely separate things and the only thing they share is the lifecycle, then I’m not sure if we should pretend the Kafka consumer is a part of the Ktor application.
  2. Performance. I don’t know what this startConsume function does do, but seeing you put it inside the Dispatchers.IO I assume it simply waits for new messages in a loop by blocking and do something with messages. In this case it is not coroutines-aware, it has nothing to do with event loops and it doesn’t really matter how or where you start it. Even if internally this consumer has an event loop of some sort, it won’t share it with Ktor or coroutines as it knows nothing about event loops of other components of the application.

So I just did a bit of digging, and found that Application implements CoroutineScope. Which I guess you already knew since you used it in your third option. :stuck_out_tongue: So if the Kafka consumer code is suspending, you could just launch it on the Application. If not… create a separate dispatcher for it.

EDIT: FWIW I found this: Using existing event-loop group · Issue #286 · ktorio/ktor · GitHub

1 Like

They are quite separate. The consumer consumes a message, processes it by calling a DB and execute some business logic, then produces the processed message. The Ktor REST server just serves some things out of the DB.

startConsume is indeed just the infinite poll loop, receiving messages. But but every message is processed by a new coroutine, in order to process messages in parallel^1. This means the consumer also heavily relies on the event loop created by runBlocking { .. }

[1] kafka-konsumer-flow/src/main/kotlin/com/eventloopsoftware/consumer/KafkaKonsumerParallel.kt at main · frevib/kafka-konsumer-flow · GitHub

Again, it is hard to discuss the kafka consuming behavior, because we don’t know what is startConsume.

Is it a suspend function? If yes, you shouldn’t generally switch to Dispatchers.IO to call it. If it is not a suspend function, it doesn’t even have access to the coroutine context of the rest of your application. Also, what runBlocking? The only runBlocking in your example is the one needed to bootstrap the application. And its event loop isn’t at all used as you switch to Dispatchers.IO. Ktor also doesn’t use it.

edit:
Ahh, sorry, I didn’t notice startConsume is in the linked file.

1 Like

I posted a link in my previous reply where you can see what startConsume does. I’ll clarify:

So there is the launch(Dispatchers.IO) to run startConsume(..) as a long-running process. And inside startConsume(..) we launch lots of coroutines that need to be completed on an event loop.

(I now see that kafka-konsumer-flow/src/main/kotlin/com/eventloopsoftware/consumer/KafkaKonsumerParallel.kt at main · frevib/kafka-konsumer-flow · GitHub is not given withContext(Dispatchers.Default), so all coroutines will be completed on it’s parent, Dispatchers.IO. I have to yet fix that)

edit: no worries :slight_smile:

Ok, after looking into the startConsume, it is in fact a suspend function and uses the context and the dispatcher of the caller. If I’m reading everything correctly and I didn’t miss anything:

  1. You don’t use the event loop of the runBlocking, because you switched to Dispatchers.IO. This switch is probably a mistake as we don’t perform blocking I/O and therefore we shouldn’t use this dispatcher.
  2. If you remove the switch, you will start using the event loop of the runBlocking, but this doesn’t provide you any benefits. This event loop is not used for anything else in your application, Ktor probably doesn’t use it. Also, this dispatcher is single-threaded, and I’m not sure you really like to consume messages with parallelism limited to 1.
  3. If you execute startConsume in Application.launch and without switching to Dispatchers.IO, it is probable it will re-use internal threads of Netty/Ktor. You can easily verify this by printing the thread and coroutine name when consuming.

Only now I realized this is not some well-known library for bridging Kafka and coroutines, but actually your code.

In this case:

  1. I think consumer.poll(timeout) is incorrect. If poll is a blocking call, then we are not allowed to run it in a suspend function. This is actually where you should use the Dispatchers.IO - wrap this single line only, not the whole consumer.
  2. It feels strange asFlow is actually consuming, not really converting to a flow. Then you collect to trigger consuming in the asFlow. Why asFlow doesn’t do what is says - convert to a flow?
  3. Concurrent consuming feels overcomplicated. You can simply do: coroutineScope { records.forEach { launch { processFunction(it) }}}

Thanks for the code review :slight_smile:

When we launch a coroutine inside the scope of Application, it is run on DefaultDispatcher-worker-x. But requests are handled on eventLoopGroupProxy-x-y. What this actually means, I have no idea. But it seems that the Kafka consumer’s records are completed on a different event loop than Ktor’s.

With your suggestions and @Skater901 I modified the parallel consumer like this:

channelFlow {
        use { consumer ->
            while (true) {
                // only wrap blocking call
                val records = withContext(Dispatchers.IO) { consumer.poll(timeout) }
                val jobs = records
                    .map { record ->
                        launch {
                            println("thread : ${Thread.currentThread().name}") // DefaultDispatcher-worker-x
                            send(processFunction(record))
                        }
                    }
                // we nee to wait until all jobs have completed, then we can poll again
                jobs.joinAll()
            }
        }
    }

If you execute startConsume in Application.launch and without switching to Dispatchers.IO, it is probable it will re-use internal threads of Netty/Ktor. You can easily verify this by printing the thread and coroutine name when consuming.

When I launch a coroutine in Application scope, it is launched on DefaultDispatcher-worker-x, not the Netty eventloop:

fun Application.kafkaConsumer() {
    println(".... starting Kafka consumer: ${Thread.currentThread().name}") // main
    launch {
        println("thread starting consumer: ${Thread.currentThread().name}") // DefaultDispatcher-worker-x
        Dependencies.kafkaKonsumerOrderAParallel.startConsume { record ->
            println("key: ${record.key()}, value: ${record.value()}")
        }
    }
}

I’m still wondering if and how it would be possible to process the kafka messages on the Netty event loop. I had a look at your suggestion @Skater901 at https://youtrack.jetbrains.com/issue/KTOR-809 but I couldn’t find an answer.

I’m not enough familiar with Ktor to know if we can anyhow access the internal event loop. Please look for either the “coroutine context”, “coroutine scope” or “dispatcher”, because this is how coroutines expose event loops / thread pools. We could potentially hijack the scope while handing some event, but this would be an ugly workaround.

Anyway, why is this so important to you? Ideally, yes, it would be better to use one thread pool for both handling HTTP requests and consuming messages. But it shouldn’t change that much if we don’t.

1 Like

Just perfecting the system. Two event loops will get in each other’s way. It’s cleaner and likely more performant to have one event loop (group) complete all jobs of the whole system.

When I learn more, I will come back and post.

If you look at the linked commit on the GitHub issue (Make NettyChannelInitializer public with EngineAPI annotation (#286) · schleinzer/ktor@6bb3b4f · GitHub) you can see that the NettyChannelInitializer class was changed from internal to public. I’m guessing that has something to do with allowing you to hook into the Netty event loop, though I don’t really know anything about Netty, so I’m not sure how it helps.

I’m curious now, though, so I’m going to do some reading and digging, and maybe I’ll come back with an answer on how you can hook into the Netty event loop. :slight_smile:

EDIT: Ok I maybe have an idea as to how to do this…

So when you create a Netty server, you create a ServerBootstrap. When you create a ServerBootstrap, you call the group function, and provide two event loops; one for handling incoming requests and adding them to a queue, and one for actually processing the requests.

When Ktor creates a ServerBootstrap, it checks if the group() and childGroup() are null: ktor/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/NettyApplicationEngine.kt at 2.3.12 · ktorio/ktor · GitHub (I’m not sure what childGroup() is.) If they are, it provides its own event loop.

The ServerBootstrap is initially created and configured using configuration provided when creating the NettyApplicationEngine.

So… I think you could be able to create a NettyApplicationEngine, and in the configuration, you can provide your own event loops via the configuration, and then you and KTOR will be using the same event loop.

Then, for how you hook your Kafka stuff in… from what I can see of the Netty docs, you use childHandler() on the ServerBootstrap to register a handler. I’m not yet sure how handlers work in terms of what triggers them, but you would need to implement the ChannelHandler interface, or extend one of the classes that implements it. (The docs show an example of extending the ChannelInboundHandlerAdapter, and then registering the class using an anonymous implementation of ChannelInitializer, so there’s obviously some options there)

Netty docs for reference: Netty.docs: User guide for 4.x

1 Like

It seems it’s quite a detour in order to get it working. In order to keep it as simple as possible, for now I’ll stick to the approach where the Kafka consumer sits in an Application module:

fun runMainKtorModules() {
    embeddedServer(Netty, port = 8080, module = Application::modules)
        .start(wait = true)

}

fun Application.modules() {
    kafkaConsumerModule()
    restApiModule()
}


fun Application.kafkaConsumerModule() {
    launch {
        startConsume { record ->
            println("key: ${record.key()}, value: ${record.value()}")
        }
    }
}

and startConsume containing a Dispatchers.IO for the blocking call to .poll(..):

while (true) {
    val records = withContext(Dispatchers.IO) { consumer.poll(timeout) }
    val jobs = records
        .map { record ->
            launch(Dispatchers.Default) { send(processFunction(record)) }
        }
    jobs.joinAll()
}

Actually with that code, I think it’s pretty easy. Check it out:

    embeddedServer(Netty, module = Application::modules, configure = {
        configureBootstrap = {
            group(EventLoopGroupProxy.create(1), EventLoopGroupProxy.create(1))

            childHandler(object : ChannelHandler {
                // Kafka code goes here
            })
        }
    })

Using the above code, you can register your Kafka handler directly into Netty, and make the Netty engine use the same event loop for everything. :slight_smile:

1 Like

So it’s likely technically possible then, but we have impelement below methods which isn’t very pretty :slight_smile: Maybe if there is enough demand for it, the folks at Ktor can make a nice API for it.

childHandler(object : ChannelHandler {
    // Kafka code goes here
    override fun handlerAdded(ctx: ChannelHandlerContext?) {
        TODO("Not yet implemented")
    }

    override fun handlerRemoved(ctx: ChannelHandlerContext?) {
        TODO("Not yet implemented")
    }

    override fun exceptionCaught(ctx: ChannelHandlerContext?, cause: Throwable?) {
        TODO("Not yet implemented")
    }
})

I know literally nothing about Ktor internals, but how does the above code make any sense? :smiley: First of all, putting any code inside the constructor of ChannelHandler would be like not calling the childHandler at all and running the code in the configureBootstrap directly. I guess we should put it the handlerAdded maybe (?). Second, if we still plan to use the code based on coroutines/flows, then Netty is not at all coroutine-aware and can’t run coroutines just like that. We would have to either configure Netty to use our event loop or acquire the event loop of Netty and wrap it into a coroutine dispatcher. Ktor probably does one of these, so it probably have such a dispatcher stored somewhere.

1 Like

I initialized a Ktor project to quickly check how it all works. First of all, I’m not even sure HTTP requests are fully handled by the Netty event loop:

get("/") {
    println("1: ${Thread.currentThread()}") // Thread[eventLoopGroupProxy-4-1,5,main]
    println("2: ${coroutineContext[ContinuationInterceptor]}") // null
    launch {
        println("3: ${Thread.currentThread()}") // Thread[DefaultDispatcher-worker-1,5,main]
    }

    call.respondText("Hello World!")
}

1 says we probably indeed run inside the Netty event loop, but according to 2, the coroutine dispatcher is not set, so the coroutine can easily switch to the thread pool provided by coroutines. We confirm this is 3 - even if we simply fork our coroutine into two, it will already leave the Netty event loop.

So again, it feels you are chasing ghosts, even Ktor doesn’t assume everything has to be handled in the Netty event loop. Or maybe I miss something here, - this is not true, see my post below.

If you are interested how this is implemented, as I said, they provide a dispatcher that wraps the Netty event loop: NettyDispatcher. When handling a user request, they bridge the Netty code and Ktor/coroutine code here: NettyApplicationCallHandler.handleRequest. Ktor keeps this NettyDispatcher in multiple places, for example in the NettyApplicationEngine we see props like nettyDispatcher, workerDispatcher, userContext. Of course all of them are private. If you get a hold of them, e.g. by reflection, you can use them to dispatch your coroutine in the Netty event loop. But maybe there was actually a reason to not expose this, I don’t know.

1 Like