Rate Limiting in Channels


#1

I have been trying to create a pipeline using Coroutine Channels that is rate limited. The problem with the rate limit is it’s not just N requests per second, it also has a limit on concurrency. So if N requests take over one second, the default refresh intervals of most token bucket libraries fail in this regards.

I tried creating a list that keeps track of both time when request was made and time when response was received. The Channel on receiving a message checks to see if there’s any item in the list which already has a response time. If it does, the item with oldest response time is compared with the 1 second. If it’s larger, I can process the message. If it’s not, I have to cause a delay.

I am stuck at the implmentation and for some reason, the coroutines only execute half the way. Could someone give me rough guidance on how to go about implementing this?


#2

You issue is not much clear for me, I try to recap.

You have an event source that produce some updates, unfortunately these updates are generated in random order.
You want to process the oldest event each second.

If I understand well, you first issue is to find the oldest event.
For this task you need a RENDEZVOUS channel A, when an event occurs you send there the timestamped version.
Then you need a CONFLATED channel B, you launch a task that consumes the channel A and sends in B the items strictly oldest than previous.

Now you channel B contains always the oldest event, you want to process it every second, so

launch {
 channelB.consumeEach { event ->
  process(event)
  delay(1_000)
 }
}

#3

@fvasco Thank you. I am sorry for being unclear the first time. I’ll post the code snippet and explain.

I tried creating an actor to keep the ratelimiting logic limited to the actor.

@ObsoleteCoroutinesApi
fun <T> CoroutineScope.limit(capacity: Int, duration: Duration, resultChannel: SendChannel<ThrottleOutput<T>>) =
    actor<T>(coroutineContext) {
        val buffer = RateLimitBuffer(capacity)
        fun shouldExecute(): Boolean {
            println("[${Thread.currentThread().name}]: Should Execute?")
            if (buffer.hasCapacity()) return true
            val oldestItem = buffer.peekOldest()

            return when {
                oldestItem?.isFulfilled() == false -> false
                oldestItem?.isFulfilled() == true && System.nanoTime().minus(oldestItem.endTime!!) >= duration.toNanos() -> true
                else -> false
            }
        }

        consumeEach {
            println("[${Thread.currentThread().name}]: Actor processing message!")
            while (!shouldExecute()) {
                delay(1000)
            }
            if (shouldExecute()) {
                val acknowledgement = RateLimitingItem(System.nanoTime(), null)
                buffer.insert(acknowledgement)
                resultChannel.send(ThrottleOutput(it, acknowledgement))
            }
        }
    }

The RateLimitBuffer class is just a wrapper over a fixed size array that keep tracks of data objects containing request trigger time and time at which the response was received.

Here’s the test class:

internal class RateLimitingActorTest {
    @Test
    fun testLimit() = runBlocking {
        val resultChannel = Channel<ThrottleOutput<Int>>()

        val rateLimit = limit(2, Duration.ofSeconds(5), resultChannel)

        println("[${Thread.currentThread().name}]: Begin processing!")
        for (x in 1..10) {
            rateLimit.send(x)
        }

        resultChannel.consumeEach {
            println("[${Thread.currentThread().name}]: Consume Result from destination Channel : ${it.item}")
            it.acknowledgement.endTime = System.nanoTime()
        }
    }
}

The problem with the code above is it stops printing output and the program runs indefinitely. Here’s the output I receive:

[main @coroutine#1]: Begin processing!
[main @coroutine#2]: Actor processing message!
[main @coroutine#2]: Should Execute?
[main @coroutine#2]: Should Execute?

I understand a close() needs to be called on the actor to exit gracefully. I am not however sure why it won’t execute on the rest of the messages.

Interestingly when I increase the capacity of the actor to something greater than 10, it works for the test code. If I change the capacity to Unlimited, that works too.

I am still new to Coroutines. So feel free to educate me where I am going wrong :slight_smile:


#4

I managed to solve the problem using the concepts I understood from this great talk: https://www.youtube.com/watch?v=a3agLJQ6vt8

I used channels and managed to pass around messages in channels.

fun <T> CoroutineScope.process(capacity: Int, duration: Duration, receiveChannel: ReceiveChannel<T>, sendChannel: SendChannel<Throttled<T>>) = launch {
    val buffer = RateLimitBuffer<T>(capacity)
    for (item in receiveChannel) {
        println("Before $item: ${buffer.data.mapIndexed { index, rateLimitingItem ->  "$index, ${rateLimitingItem?.startTime},  ${rateLimitingItem?.endTime} ::::"}}")
        scheduler@ while (true) {
            if (!canBeScheduled(item, buffer, duration)) {
                delay(100)
                continue@scheduler
            } else break@scheduler
        }
        val acknowledgement = RateLimitingItem(item, System.nanoTime(), null)

        buffer.insert(acknowledgement)
        sendChannel.send(Throttled(item, acknowledgement))
        println("After $item: ${buffer.data.mapIndexed { index, rateLimitingItem ->  "$index, ${rateLimitingItem?.startTime},  ${rateLimitingItem?.endTime} ::::"}}")
    }
}

fun <T> CoroutineScope.worker(receiveChannel: ReceiveChannel<Throttled<T>>) = launch {
    for (item in receiveChannel) {
        println("[${Thread.currentThread().name}]: Consume Result from destination Channel : ${item.item}")
        item.acknowledgement.endTime = System.nanoTime()
    }
}

fun <T> canBeScheduled(item: T, buffer: RateLimitBuffer<T>, duration: Duration): Boolean {
    if (buffer.hasCapacity()) return true
    val oldestItem = buffer.peekOldest()

    return when {
        oldestItem?.isFulfilled() == false -> false
        oldestItem?.isFulfilled() == true && System.nanoTime().minus(oldestItem.endTime!!) >= duration.toNanos() -> true
        else -> false
    }
}


data class Throttled<T>(
    val item: T,
    val acknowledgement: RateLimitingItem<T>
)

data class Limiter<T>(
    val rateLimitingBuffer: RateLimitBuffer<T>,
    val duration: Duration
)

Here’s the code to see it running

internal class NewRateLimiterKtTest {
    fun process() = runBlocking {
        val outputChannel = Channel<Throttled<Int>>()
        val inputChannel = Channel<Int>()

        repeat(5) { worker(outputChannel) }
        val job = process(2, Duration.ofSeconds(5), inputChannel, outputChannel)

        println("[${Thread.currentThread().name}]: Begin processing!")
        for (x in 1..10) {
            println("[${Thread.currentThread().name}]: Sending Data $x")
            inputChannel.send(x)
        }

        job.cancelAndJoin().also {
            inputChannel.close()
            outputChannel.close()
        }
        println("All Done!")

    }
}

It’s not generic enough. I’ll try to make it more generic and post back later.