Rate Limiting in Channels

I have been trying to create a pipeline using Coroutine Channels that is rate limited. The problem with the rate limit is it’s not just N requests per second, it also has a limit on concurrency. So if N requests take over one second, the default refresh intervals of most token bucket libraries fail in this regards.

I tried creating a list that keeps track of both time when request was made and time when response was received. The Channel on receiving a message checks to see if there’s any item in the list which already has a response time. If it does, the item with oldest response time is compared with the 1 second. If it’s larger, I can process the message. If it’s not, I have to cause a delay.

I am stuck at the implmentation and for some reason, the coroutines only execute half the way. Could someone give me rough guidance on how to go about implementing this?

You issue is not much clear for me, I try to recap.

You have an event source that produce some updates, unfortunately these updates are generated in random order.
You want to process the oldest event each second.

If I understand well, you first issue is to find the oldest event.
For this task you need a RENDEZVOUS channel A, when an event occurs you send there the timestamped version.
Then you need a CONFLATED channel B, you launch a task that consumes the channel A and sends in B the items strictly oldest than previous.

Now you channel B contains always the oldest event, you want to process it every second, so

launch {
 channelB.consumeEach { event ->

@fvasco Thank you. I am sorry for being unclear the first time. I’ll post the code snippet and explain.

I tried creating an actor to keep the ratelimiting logic limited to the actor.

fun <T> CoroutineScope.limit(capacity: Int, duration: Duration, resultChannel: SendChannel<ThrottleOutput<T>>) =
    actor<T>(coroutineContext) {
        val buffer = RateLimitBuffer(capacity)
        fun shouldExecute(): Boolean {
            println("[${Thread.currentThread().name}]: Should Execute?")
            if (buffer.hasCapacity()) return true
            val oldestItem = buffer.peekOldest()

            return when {
                oldestItem?.isFulfilled() == false -> false
                oldestItem?.isFulfilled() == true && System.nanoTime().minus(oldestItem.endTime!!) >= duration.toNanos() -> true
                else -> false

        consumeEach {
            println("[${Thread.currentThread().name}]: Actor processing message!")
            while (!shouldExecute()) {
            if (shouldExecute()) {
                val acknowledgement = RateLimitingItem(System.nanoTime(), null)
                resultChannel.send(ThrottleOutput(it, acknowledgement))

The RateLimitBuffer class is just a wrapper over a fixed size array that keep tracks of data objects containing request trigger time and time at which the response was received.

Here’s the test class:

internal class RateLimitingActorTest {
    fun testLimit() = runBlocking {
        val resultChannel = Channel<ThrottleOutput<Int>>()

        val rateLimit = limit(2, Duration.ofSeconds(5), resultChannel)

        println("[${Thread.currentThread().name}]: Begin processing!")
        for (x in 1..10) {

        resultChannel.consumeEach {
            println("[${Thread.currentThread().name}]: Consume Result from destination Channel : ${it.item}")
            it.acknowledgement.endTime = System.nanoTime()

The problem with the code above is it stops printing output and the program runs indefinitely. Here’s the output I receive:

[main @coroutine#1]: Begin processing!
[main @coroutine#2]: Actor processing message!
[main @coroutine#2]: Should Execute?
[main @coroutine#2]: Should Execute?

I understand a close() needs to be called on the actor to exit gracefully. I am not however sure why it won’t execute on the rest of the messages.

Interestingly when I increase the capacity of the actor to something greater than 10, it works for the test code. If I change the capacity to Unlimited, that works too.

I am still new to Coroutines. So feel free to educate me where I am going wrong :slight_smile:

I managed to solve the problem using the concepts I understood from this great talk: KotlinConf 2018 - Kotlin Coroutines in Practice by Roman Elizarov - YouTube

I used channels and managed to pass around messages in channels.

fun <T> CoroutineScope.process(capacity: Int, duration: Duration, receiveChannel: ReceiveChannel<T>, sendChannel: SendChannel<Throttled<T>>) = launch {
    val buffer = RateLimitBuffer<T>(capacity)
    for (item in receiveChannel) {
        println("Before $item: ${buffer.data.mapIndexed { index, rateLimitingItem ->  "$index, ${rateLimitingItem?.startTime},  ${rateLimitingItem?.endTime} ::::"}}")
        scheduler@ while (true) {
            if (!canBeScheduled(item, buffer, duration)) {
            } else break@scheduler
        val acknowledgement = RateLimitingItem(item, System.nanoTime(), null)

        sendChannel.send(Throttled(item, acknowledgement))
        println("After $item: ${buffer.data.mapIndexed { index, rateLimitingItem ->  "$index, ${rateLimitingItem?.startTime},  ${rateLimitingItem?.endTime} ::::"}}")

fun <T> CoroutineScope.worker(receiveChannel: ReceiveChannel<Throttled<T>>) = launch {
    for (item in receiveChannel) {
        println("[${Thread.currentThread().name}]: Consume Result from destination Channel : ${item.item}")
        item.acknowledgement.endTime = System.nanoTime()

fun <T> canBeScheduled(item: T, buffer: RateLimitBuffer<T>, duration: Duration): Boolean {
    if (buffer.hasCapacity()) return true
    val oldestItem = buffer.peekOldest()

    return when {
        oldestItem?.isFulfilled() == false -> false
        oldestItem?.isFulfilled() == true && System.nanoTime().minus(oldestItem.endTime!!) >= duration.toNanos() -> true
        else -> false

data class Throttled<T>(
    val item: T,
    val acknowledgement: RateLimitingItem<T>

data class Limiter<T>(
    val rateLimitingBuffer: RateLimitBuffer<T>,
    val duration: Duration

Here’s the code to see it running

internal class NewRateLimiterKtTest {
    fun process() = runBlocking {
        val outputChannel = Channel<Throttled<Int>>()
        val inputChannel = Channel<Int>()

        repeat(5) { worker(outputChannel) }
        val job = process(2, Duration.ofSeconds(5), inputChannel, outputChannel)

        println("[${Thread.currentThread().name}]: Begin processing!")
        for (x in 1..10) {
            println("[${Thread.currentThread().name}]: Sending Data $x")

        job.cancelAndJoin().also {
        println("All Done!")


It’s not generic enough. I’ll try to make it more generic and post back later.

I managed to make it generic enough so that others can extend it further with their own concrete implementations. Posting it here in case someone is struggling with the same usecase :slight_smile:

The base interfaces and data classes:

interface RateLimitBuffer<T> {
    val data: Array<RateLimitingItem<T>?>
    val duration: Duration

    fun insert(item: RateLimitingItem<T>): Boolean
    fun peekOldest(): RateLimitingItem<T>?
    fun hasCapacity(): Boolean

    fun canBeScheduled(): Boolean
    suspend fun causeDelay()

interface RateLimitingItem<T> : Comparable<RateLimitingItem<T>> {
    val startTime: Long
    var endTime: Long?
    val data: T
    fun isFulfilled(): Boolean

data class Throttled<T>(
        val item: T,
        val acknowledgement: RateLimitingItem<T>

This is where the magic happens:

fun <T> CoroutineScope.rateLimiter(
        receiveChannel: ReceiveChannel<T>,
        sendChannel: SendChannel<Throttled<T>>,
        buffer: RateLimitBuffer<T>,
        pushItemToBuffer: (Long, Long?, T) -> RateLimitingItem<T>
) = launch {
    for (item in receiveChannel) {
        scheduler@ while (true) {
            if (!buffer.canBeScheduled()) {
            } else break@scheduler
        val acknowledgement = pushItemToBuffer(System.nanoTime(), null, item)

        sendChannel.send(Throttled(item, acknowledgement))

Some example implementations for the Buffer and RateLimitingItem:

class RateLimiterImpl<T>(private val capacity: Int, override val duration: Duration) : RateLimitBuffer<T> {
    override val data: Array<RateLimitingItem<T>?> = arrayOfNulls(capacity)
    var size = 0
        private set

    override fun insert(item: RateLimitingItem<T>): Boolean {
        // Find write-able position
        // Either find a null position in the array or find an item that satisfies the isFulfilled for nullability
        var writePosition = data.indexOfFirst { it == null }
        if (writePosition == -1) {
            writePosition = data.filterNotNull().indexOfFirst { it.isFulfilled() }
        return when {
            writePosition < 0 -> false
            else -> {
                return when {
                    data[writePosition] != null -> {
                        data[writePosition] = item
                    else -> {
                        data[writePosition] = item

    override fun peekOldest(): RateLimitingItem<T>? {
        val sortedNonNullList = data.filterNotNull().sorted()
        return when {
            sortedNonNullList.isNotEmpty() -> sortedNonNullList[0]
            else -> null

    override fun hasCapacity(): Boolean {
        return when {
            size < capacity -> true
            else -> false

    override fun canBeScheduled(): Boolean {
        if (this.hasCapacity()) return true
        val oldestItem = this.peekOldest()

        return when {
            oldestItem?.isFulfilled() == false -> false
            oldestItem?.isFulfilled() == true && System.nanoTime().minus(oldestItem.endTime!!) >= duration.toNanos() -> true
            else -> false

    override suspend fun causeDelay() {
        delay(duration.toMillis() / capacity)


class RateLimitingItemImpl<T>(override val startTime: Long, override var endTime: Long?, override val data: T) : RateLimitingItem<T> {
    override fun isFulfilled(): Boolean {
        return endTime != null

    override fun compareTo(other: RateLimitingItem<T>): Int {
        return (other.endTime?.let { this.endTime?.compareTo(it) }) ?: -1

Test code to test the functionality:

internal class RateLimiterKtTest {

    fun rateLimiterTest() = runBlocking {
        val outputChannel = Channel<Throttled<Int>>()
        val inputChannel = Channel<Int>()
        val buffer = RateLimiterImpl<Int>(2, Duration.ofSeconds(1))

        launch {
            val jobSingleWorker = rateLimiter(inputChannel, outputChannel, buffer) { startTime, endTime, item ->
                RateLimitingItemImpl(startTime, endTime, item)
            println("[${Thread.currentThread().name}]: Let's start our jobs!")

            launch {
                val totalTime = measureTimeMillis {
                    for (x in 1..11) {
                        val item = outputChannel.receive()
                        item.acknowledgement.endTime = System.nanoTime()
                        println("[${Thread.currentThread().name}]: Received ${item.item}")
                assert(totalTime > 5000)
                assert(totalTime < 6000)

            for (x in 1..11) {
                println("[${Thread.currentThread().name}]: Sending Data $x")

        println("All Done!")

@sumit @fvasco i think we can simply use the flow operator debounce/throttle for rate limit. what you say guys?

Replace Channel with Flow is the way to go.