Custom function to update data in pending kotlin channel buffer

I have an UNLIMITED size buffered channel where senders are much faster than receivers. I would like to update the buffer by removing old data and replacing it with newer one (if the receiver does not yet consume it)

Here is my code

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch

data class Item(val id: Int, val value: Int)
val testData = listOf(
    Item(1, 10),
    Item(2, 24),
    Item(3, 12),
    Item(1, 17), // This one should replace the Item(1, 10) if it's not yet consumed
    Item(4, 16),
    Item(2, 32), // This one should replace the Item(2, 24) if it's not yet consumed
)

suspend fun main(): Unit = coroutineScope {
    val channel = Channel<Item>(Channel.UNLIMITED)

    launch {
        for(item in testData) {
            delay(50)
            println("Producing item $item")
            channel.send(item)
        }
    }

    // As you can see the receivers already sent all the testData and they are waiting in the buffer to be consumed by the receiver.
    // I would like to do some checks whenever new item is added to the buffer 
    // if(itemInBuffer.id == newItem.id && itemInBuffer.value < newItem.value) then replace it with newItem
    
    launch {
        for (item in channel) {
            delay(5000)
            println(item.toString())
        }
    }
}

Is there any kotlin built function which takes some custom condition and removes items from the buffer? I saw there is a function called distinctUntilChangedBy in flow which removes the duplicate data based on the custom key selector. Is there anything similar available for Channel or Is it possible to achieve it with ChannelFlow (Note: in my real code events are comes from some network calls so I’m not sure channelFlow could be suitable there)

Note this is not as easy as it sounds, because we can’t easily find the item in the queue, without iterating through all of them or creating additional data structure, most probably a map. distinctUntilChangedBy() is different, because it only compares with the last item, not with all items in the queue.

Do you have any requirements on when the replaced item should be consumed? Should it be this:

1, 17
2, 32
3, 12
4, 16

or this:

3, 12
1, 17
4, 16
2, 32

?

@broot Both are ok, I just want avoid the item with same id. So I wanna replace the already existing one with new item if the value is greater than previous one

I answered in StackOverflow as you asked this there as well: Custom function to update data in pending kotlin channel buffer - Stack Overflow

Consider the channel-replacing primitive proposed in this comment [Proposal] Primitive or Channel that guarantees the delivery and processing of items · Issue #2886 · Kotlin/kotlinx.coroutines · GitHub (for other reasons but whatever).

Starting from that, it’s super trivial to implement replacement of existing entries: just add a synchronized block to access the back buffer and do your modifications there: if an item is already received, it will not be in the buffer, if it’s not it will be in the buffer.

If your replacement code is more complex (such as if you want to keep the counter for consumed objects to make replacement more efficient) you can just put it in the synchronized block and it would work.
Only downsides:
It doesn’t adhere to the channel interface
It doesn’t have a close method (but you can implement it if you want) (or just send null)