I have an UNLIMITED
size buffered channel where senders are much faster than receivers. I would like to update the buffer by removing old data and replacing it with newer one (if the receiver does not yet consume it)
Here is my code
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
data class Item(val id: Int, val value: Int)
val testData = listOf(
Item(1, 10),
Item(2, 24),
Item(3, 12),
Item(1, 17), // This one should replace the Item(1, 10) if it's not yet consumed
Item(4, 16),
Item(2, 32), // This one should replace the Item(2, 24) if it's not yet consumed
)
suspend fun main(): Unit = coroutineScope {
val channel = Channel<Item>(Channel.UNLIMITED)
launch {
for(item in testData) {
delay(50)
println("Producing item $item")
channel.send(item)
}
}
// As you can see the receivers already sent all the testData and they are waiting in the buffer to be consumed by the receiver.
// I would like to do some checks whenever new item is added to the buffer
// if(itemInBuffer.id == newItem.id && itemInBuffer.value < newItem.value) then replace it with newItem
launch {
for (item in channel) {
delay(5000)
println(item.toString())
}
}
}
Is there any kotlin built function which takes some custom condition and removes items from the buffer? I saw there is a function called distinctUntilChangedBy in flow which removes the duplicate data based on the custom key selector. Is there anything similar available for Channel
or Is it possible to achieve it with ChannelFlow
(Note: in my real code events are comes from some network calls so I’m not sure channelFlow
could be suitable there)