I managed to make it generic enough so that others can extend it further with their own concrete implementations. Posting it here in case someone is struggling with the same usecase 
The base interfaces and data classes:
interface RateLimitBuffer<T> {
val data: Array<RateLimitingItem<T>?>
val duration: Duration
fun insert(item: RateLimitingItem<T>): Boolean
fun peekOldest(): RateLimitingItem<T>?
fun hasCapacity(): Boolean
fun canBeScheduled(): Boolean
suspend fun causeDelay()
}
interface RateLimitingItem<T> : Comparable<RateLimitingItem<T>> {
val startTime: Long
var endTime: Long?
val data: T
fun isFulfilled(): Boolean
}
data class Throttled<T>(
val item: T,
val acknowledgement: RateLimitingItem<T>
)
This is where the magic happens:
fun <T> CoroutineScope.rateLimiter(
receiveChannel: ReceiveChannel<T>,
sendChannel: SendChannel<Throttled<T>>,
buffer: RateLimitBuffer<T>,
pushItemToBuffer: (Long, Long?, T) -> RateLimitingItem<T>
) = launch {
for (item in receiveChannel) {
scheduler@ while (true) {
if (!buffer.canBeScheduled()) {
buffer.causeDelay()
continue@scheduler
} else break@scheduler
}
val acknowledgement = pushItemToBuffer(System.nanoTime(), null, item)
buffer.insert(acknowledgement)
sendChannel.send(Throttled(item, acknowledgement))
}
}
Some example implementations for the Buffer and RateLimitingItem:
class RateLimiterImpl<T>(private val capacity: Int, override val duration: Duration) : RateLimitBuffer<T> {
override val data: Array<RateLimitingItem<T>?> = arrayOfNulls(capacity)
var size = 0
private set
@Synchronized
override fun insert(item: RateLimitingItem<T>): Boolean {
// Find write-able position
// Either find a null position in the array or find an item that satisfies the isFulfilled for nullability
var writePosition = data.indexOfFirst { it == null }
if (writePosition == -1) {
writePosition = data.filterNotNull().indexOfFirst { it.isFulfilled() }
}
return when {
writePosition < 0 -> false
else -> {
return when {
data[writePosition] != null -> {
data[writePosition] = item
true
}
else -> {
data[writePosition] = item
size++
true
}
}
}
}
}
@Synchronized
override fun peekOldest(): RateLimitingItem<T>? {
val sortedNonNullList = data.filterNotNull().sorted()
return when {
sortedNonNullList.isNotEmpty() -> sortedNonNullList[0]
else -> null
}
}
@Synchronized
override fun hasCapacity(): Boolean {
return when {
size < capacity -> true
else -> false
}
}
override fun canBeScheduled(): Boolean {
if (this.hasCapacity()) return true
val oldestItem = this.peekOldest()
return when {
oldestItem?.isFulfilled() == false -> false
oldestItem?.isFulfilled() == true && System.nanoTime().minus(oldestItem.endTime!!) >= duration.toNanos() -> true
else -> false
}
}
override suspend fun causeDelay() {
delay(duration.toMillis() / capacity)
}
}
class RateLimitingItemImpl<T>(override val startTime: Long, override var endTime: Long?, override val data: T) : RateLimitingItem<T> {
override fun isFulfilled(): Boolean {
return endTime != null
}
override fun compareTo(other: RateLimitingItem<T>): Int {
return (other.endTime?.let { this.endTime?.compareTo(it) }) ?: -1
}
}
Test code to test the functionality:
internal class RateLimiterKtTest {
@Test
fun rateLimiterTest() = runBlocking {
val outputChannel = Channel<Throttled<Int>>()
val inputChannel = Channel<Int>()
val buffer = RateLimiterImpl<Int>(2, Duration.ofSeconds(1))
launch {
val jobSingleWorker = rateLimiter(inputChannel, outputChannel, buffer) { startTime, endTime, item ->
RateLimitingItemImpl(startTime, endTime, item)
}
println("[${Thread.currentThread().name}]: Let's start our jobs!")
launch {
val totalTime = measureTimeMillis {
for (x in 1..11) {
val item = outputChannel.receive()
item.acknowledgement.endTime = System.nanoTime()
println("[${Thread.currentThread().name}]: Received ${item.item}")
}
}
assert(totalTime > 5000)
assert(totalTime < 6000)
}
for (x in 1..11) {
println("[${Thread.currentThread().name}]: Sending Data $x")
inputChannel.send(x)
}
inputChannel.close()
jobSingleWorker.join()
}
println("All Done!")
}
}