Rate Limiting in Channels

I have been trying to create a pipeline using Coroutine Channels that is rate limited. The problem with the rate limit is it’s not just N requests per second, it also has a limit on concurrency. So if N requests take over one second, the default refresh intervals of most token bucket libraries fail in this regards.

I tried creating a list that keeps track of both time when request was made and time when response was received. The Channel on receiving a message checks to see if there’s any item in the list which already has a response time. If it does, the item with oldest response time is compared with the 1 second. If it’s larger, I can process the message. If it’s not, I have to cause a delay.

I am stuck at the implmentation and for some reason, the coroutines only execute half the way. Could someone give me rough guidance on how to go about implementing this?

1 Like

You issue is not much clear for me, I try to recap.

You have an event source that produce some updates, unfortunately these updates are generated in random order.
You want to process the oldest event each second.

If I understand well, you first issue is to find the oldest event.
For this task you need a RENDEZVOUS channel A, when an event occurs you send there the timestamped version.
Then you need a CONFLATED channel B, you launch a task that consumes the channel A and sends in B the items strictly oldest than previous.

Now you channel B contains always the oldest event, you want to process it every second, so

launch {
 channelB.consumeEach { event ->
  process(event)
  delay(1_000)
 }
}

@fvasco Thank you. I am sorry for being unclear the first time. I’ll post the code snippet and explain.

I tried creating an actor to keep the ratelimiting logic limited to the actor.

@ObsoleteCoroutinesApi
fun <T> CoroutineScope.limit(capacity: Int, duration: Duration, resultChannel: SendChannel<ThrottleOutput<T>>) =
    actor<T>(coroutineContext) {
        val buffer = RateLimitBuffer(capacity)
        fun shouldExecute(): Boolean {
            println("[${Thread.currentThread().name}]: Should Execute?")
            if (buffer.hasCapacity()) return true
            val oldestItem = buffer.peekOldest()

            return when {
                oldestItem?.isFulfilled() == false -> false
                oldestItem?.isFulfilled() == true && System.nanoTime().minus(oldestItem.endTime!!) >= duration.toNanos() -> true
                else -> false
            }
        }

        consumeEach {
            println("[${Thread.currentThread().name}]: Actor processing message!")
            while (!shouldExecute()) {
                delay(1000)
            }
            if (shouldExecute()) {
                val acknowledgement = RateLimitingItem(System.nanoTime(), null)
                buffer.insert(acknowledgement)
                resultChannel.send(ThrottleOutput(it, acknowledgement))
            }
        }
    }

The RateLimitBuffer class is just a wrapper over a fixed size array that keep tracks of data objects containing request trigger time and time at which the response was received.

Here’s the test class:

internal class RateLimitingActorTest {
    @Test
    fun testLimit() = runBlocking {
        val resultChannel = Channel<ThrottleOutput<Int>>()

        val rateLimit = limit(2, Duration.ofSeconds(5), resultChannel)

        println("[${Thread.currentThread().name}]: Begin processing!")
        for (x in 1..10) {
            rateLimit.send(x)
        }

        resultChannel.consumeEach {
            println("[${Thread.currentThread().name}]: Consume Result from destination Channel : ${it.item}")
            it.acknowledgement.endTime = System.nanoTime()
        }
    }
}

The problem with the code above is it stops printing output and the program runs indefinitely. Here’s the output I receive:

[main @coroutine#1]: Begin processing!
[main @coroutine#2]: Actor processing message!
[main @coroutine#2]: Should Execute?
[main @coroutine#2]: Should Execute?

I understand a close() needs to be called on the actor to exit gracefully. I am not however sure why it won’t execute on the rest of the messages.

Interestingly when I increase the capacity of the actor to something greater than 10, it works for the test code. If I change the capacity to Unlimited, that works too.

I am still new to Coroutines. So feel free to educate me where I am going wrong :slight_smile:

I managed to solve the problem using the concepts I understood from this great talk: KotlinConf 2018 - Kotlin Coroutines in Practice by Roman Elizarov - YouTube

I used channels and managed to pass around messages in channels.

fun <T> CoroutineScope.process(capacity: Int, duration: Duration, receiveChannel: ReceiveChannel<T>, sendChannel: SendChannel<Throttled<T>>) = launch {
    val buffer = RateLimitBuffer<T>(capacity)
    for (item in receiveChannel) {
        println("Before $item: ${buffer.data.mapIndexed { index, rateLimitingItem ->  "$index, ${rateLimitingItem?.startTime},  ${rateLimitingItem?.endTime} ::::"}}")
        scheduler@ while (true) {
            if (!canBeScheduled(item, buffer, duration)) {
                delay(100)
                continue@scheduler
            } else break@scheduler
        }
        val acknowledgement = RateLimitingItem(item, System.nanoTime(), null)

        buffer.insert(acknowledgement)
        sendChannel.send(Throttled(item, acknowledgement))
        println("After $item: ${buffer.data.mapIndexed { index, rateLimitingItem ->  "$index, ${rateLimitingItem?.startTime},  ${rateLimitingItem?.endTime} ::::"}}")
    }
}

fun <T> CoroutineScope.worker(receiveChannel: ReceiveChannel<Throttled<T>>) = launch {
    for (item in receiveChannel) {
        println("[${Thread.currentThread().name}]: Consume Result from destination Channel : ${item.item}")
        item.acknowledgement.endTime = System.nanoTime()
    }
}

fun <T> canBeScheduled(item: T, buffer: RateLimitBuffer<T>, duration: Duration): Boolean {
    if (buffer.hasCapacity()) return true
    val oldestItem = buffer.peekOldest()

    return when {
        oldestItem?.isFulfilled() == false -> false
        oldestItem?.isFulfilled() == true && System.nanoTime().minus(oldestItem.endTime!!) >= duration.toNanos() -> true
        else -> false
    }
}


data class Throttled<T>(
    val item: T,
    val acknowledgement: RateLimitingItem<T>
)

data class Limiter<T>(
    val rateLimitingBuffer: RateLimitBuffer<T>,
    val duration: Duration
)

Here’s the code to see it running

internal class NewRateLimiterKtTest {
    fun process() = runBlocking {
        val outputChannel = Channel<Throttled<Int>>()
        val inputChannel = Channel<Int>()

        repeat(5) { worker(outputChannel) }
        val job = process(2, Duration.ofSeconds(5), inputChannel, outputChannel)

        println("[${Thread.currentThread().name}]: Begin processing!")
        for (x in 1..10) {
            println("[${Thread.currentThread().name}]: Sending Data $x")
            inputChannel.send(x)
        }

        job.cancelAndJoin().also {
            inputChannel.close()
            outputChannel.close()
        }
        println("All Done!")

    }
}

It’s not generic enough. I’ll try to make it more generic and post back later.

I managed to make it generic enough so that others can extend it further with their own concrete implementations. Posting it here in case someone is struggling with the same usecase :slight_smile:

The base interfaces and data classes:

interface RateLimitBuffer<T> {
    val data: Array<RateLimitingItem<T>?>
    val duration: Duration

    fun insert(item: RateLimitingItem<T>): Boolean
    fun peekOldest(): RateLimitingItem<T>?
    fun hasCapacity(): Boolean

    fun canBeScheduled(): Boolean
    suspend fun causeDelay()
}

interface RateLimitingItem<T> : Comparable<RateLimitingItem<T>> {
    val startTime: Long
    var endTime: Long?
    val data: T
    fun isFulfilled(): Boolean
}

data class Throttled<T>(
        val item: T,
        val acknowledgement: RateLimitingItem<T>
)

This is where the magic happens:

fun <T> CoroutineScope.rateLimiter(
        receiveChannel: ReceiveChannel<T>,
        sendChannel: SendChannel<Throttled<T>>,
        buffer: RateLimitBuffer<T>,
        pushItemToBuffer: (Long, Long?, T) -> RateLimitingItem<T>
) = launch {
    for (item in receiveChannel) {
        scheduler@ while (true) {
            if (!buffer.canBeScheduled()) {
                buffer.causeDelay()
                continue@scheduler
            } else break@scheduler
        }
        val acknowledgement = pushItemToBuffer(System.nanoTime(), null, item)

        buffer.insert(acknowledgement)
        sendChannel.send(Throttled(item, acknowledgement))
    }
}

Some example implementations for the Buffer and RateLimitingItem:

class RateLimiterImpl<T>(private val capacity: Int, override val duration: Duration) : RateLimitBuffer<T> {
    override val data: Array<RateLimitingItem<T>?> = arrayOfNulls(capacity)
    var size = 0
        private set

    @Synchronized
    override fun insert(item: RateLimitingItem<T>): Boolean {
        // Find write-able position
        // Either find a null position in the array or find an item that satisfies the isFulfilled for nullability
        var writePosition = data.indexOfFirst { it == null }
        if (writePosition == -1) {
            writePosition = data.filterNotNull().indexOfFirst { it.isFulfilled() }
        }
        return when {
            writePosition < 0 -> false
            else -> {
                return when {
                    data[writePosition] != null -> {
                        data[writePosition] = item
                        true
                    }
                    else -> {
                        data[writePosition] = item
                        size++
                        true
                    }
                }
            }
        }
    }

    @Synchronized
    override fun peekOldest(): RateLimitingItem<T>? {
        val sortedNonNullList = data.filterNotNull().sorted()
        return when {
            sortedNonNullList.isNotEmpty() -> sortedNonNullList[0]
            else -> null
        }
    }

    @Synchronized
    override fun hasCapacity(): Boolean {
        return when {
            size < capacity -> true
            else -> false
        }
    }

    override fun canBeScheduled(): Boolean {
        if (this.hasCapacity()) return true
        val oldestItem = this.peekOldest()

        return when {
            oldestItem?.isFulfilled() == false -> false
            oldestItem?.isFulfilled() == true && System.nanoTime().minus(oldestItem.endTime!!) >= duration.toNanos() -> true
            else -> false
        }
    }

    override suspend fun causeDelay() {
        delay(duration.toMillis() / capacity)
    }

}


class RateLimitingItemImpl<T>(override val startTime: Long, override var endTime: Long?, override val data: T) : RateLimitingItem<T> {
    override fun isFulfilled(): Boolean {
        return endTime != null
    }

    override fun compareTo(other: RateLimitingItem<T>): Int {
        return (other.endTime?.let { this.endTime?.compareTo(it) }) ?: -1
    }
}

Test code to test the functionality:

internal class RateLimiterKtTest {

    @Test
    fun rateLimiterTest() = runBlocking {
        val outputChannel = Channel<Throttled<Int>>()
        val inputChannel = Channel<Int>()
        val buffer = RateLimiterImpl<Int>(2, Duration.ofSeconds(1))

        launch {
            val jobSingleWorker = rateLimiter(inputChannel, outputChannel, buffer) { startTime, endTime, item ->
                RateLimitingItemImpl(startTime, endTime, item)
            }
            println("[${Thread.currentThread().name}]: Let's start our jobs!")

            launch {
                val totalTime = measureTimeMillis {
                    for (x in 1..11) {
                        val item = outputChannel.receive()
                        item.acknowledgement.endTime = System.nanoTime()
                        println("[${Thread.currentThread().name}]: Received ${item.item}")
                    }
                }
                assert(totalTime > 5000)
                assert(totalTime < 6000)
            }


            for (x in 1..11) {
                println("[${Thread.currentThread().name}]: Sending Data $x")
                inputChannel.send(x)
            }
            inputChannel.close()
            jobSingleWorker.join()
        }

        println("All Done!")
    }
}

@sumit @fvasco i think we can simply use the flow operator debounce/throttle for rate limit. what you say guys?

Replace Channel with Flow is the way to go.