Flows: Combining distinctUntilChanged and first() to get single distinct values from a SharedFlow

I have a SharedFlow that forwards messages that were received from the network. Sometimes, these messages are repeated. Some subscribers expect these duplicates, others need them filtered out. (The values are null in case the received messages were found to be invalid.)

Some coroutines only need to sample the most recent flow value (= a received message) sometimes.
These messages must be parsed, but I do not want to parse every message right when it arrives, since this wastes CPU cycles (since I only need the parse result if the coroutines actually need it). I use buffer, map, and first for that, like this:

val myFlow = theSharedFlow.buffer(1, BufferOverflow.DROP_OLDEST).map { parseMessage(it) }

val value = myFlow.first()

This works well. However, I do not know how to integrate duplicate detection in a flow. (Duplicates are detected by comparing classes.) My initial idea was to use distinctUntilChanged:

val myFlow = theSharedFlow.buffer(1, BufferOverflow.DROP_OLDEST).map { parseMessage(it) }.distinctUntilChanged { old, new ->
    ((old == null) && (new == null)) ||
    ((old != null) && (new != null) && (old::class == new::class))
}

val value = myFlow.first()

However, this does not work, since first cancels the distinctUntilChanged flow after getting a value. So, I am confused as to how to do that. I can’t run distinctUntilChanged before buffer, since as said, this would effectively run the parse function every time a message arrives.

So, I want to do the duplicate check on-demand, when I actually need a value. This means then that the previous value has to be stored somehow. But since first cancels the collection after getting an item, I do not see how I can do that. Any ideas? A second buffer maybe?

Not following your use case 100%. Do you just want to have the previous message available?

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.*
val flowWithPrevious = flowOf(1,2,3,4,5)
    .scan(-1 to -1) {
        acc, next -> acc.second to next
    } 
    .drop(1)
fun main() = runBlocking {
    println(flowWithPrevious.toList())
} 

Keep in mind that if you don’t see the operator you want, it’s often pretty easy to write your own. Take a look at the 6 trivial lines of source code for scan for inspiration.

The source is a hot flow, a SharedFlow, and I want to be able to get one value at any moment - but always exactly one (the most recent one). That’s why I do val myFlow = theSharedFlow.buffer(1, BufferOverflow.DROP_OLDEST). This keeps collecting the latest value. And I need to parse that value - I’m actually interested in the parsed result. So, I add map { parseMessage(it) }. I put that one behind the buffer because the parseMessage call needs to run on demand (that is, when I query a value), not every time a new message arrives, because parsing is expensive. But incoming values may contain the same content, and I know that only after parsing. I want to weed out duplicates. My current logic therefore is something like:

    var previousParsedValue: ParsedValue ? = null
    val myFlow = theSharedFlow.buffer(1, BufferOverflow.DROP_OLDEST).map { parseMessage(it) }

    suspend fun getNextUniqueValue(): ParsedValue {
        while (true) {
            val parsedValue = parseMessage(myFlow.first())

            if ((previousParsedValue != null) && (previousParsedValue == parsedValue))
                continue

            previousParsedValue = parsedValue

            return parsedValue
        }   
    }

Then, whenever I call getNextUniqueValue, messages are parsed only when that call is made, and only non-duplicate parsed values are returned.

My question now is if it is possible to fully put such a logic into a flow. There must be a terminal operator that returns exactly one value - the most recent value, like first does.

I think you maybe misunderstand how operators work on hot flows.

Even if theSharedFlow is hot, myFlow is not because nothing is collecting it all the time causing it to run.

When you call first, only then does myFlow become active. The buffer and map calls are not doing anything before that.

If you want to keep a most recent value cached for the sake of future collection, you need a SharedFlow specifically for that.

Instead of theSharedFlow.buffer(1, BufferOverflow.DROP_OLDEST), try theSharedFlow.shareIn(myScope, SharingStarted.EAGER, 1). This gives you a coroutine that is always collecting and keeping the most recent value.

Then when you collect, if you have a predicate for your “first” value you want to collect, you can use the overload that takes a predicate.

Also, if parsing is expensive, consider not re-parsing that cached value multiple times by keeping the pre-parsed version of previousParsedValue around too for instance comparison (===). I recommend avoiding while(true) where first can continually re-return a buffered value without suspending.

Also keep in mind that any operator becomes a terminal operator if you just stick .collect() after it :).

Here’s an example I think could help:

    var lastUniqueMessage: Message? = null //Kept for pre-parsing comparison
    var lastUniqueParsedValue: ParsedValue ? = null //Renamed for my personal clarity

    //Always running and caching the single most recent message
    val myFlow = theSharedFlow.shareIn(myScope, SharingStarted.EAGER, 1)

    suspend fun getNextUniqueValue(): ParsedValue {
        //takeWhile will not spin like while(true)
        //first { ... } would also work but isn't as readable IMO.
        myFlow.takeWhile {
            //Don't bother parsing if it's the same message
            if (lastUniqueMessage == null || it !== lastUniqueMessage) {
                val parsedValue = parseMessage(it)
                if (parsedValue != lastUniqueParsedValue) {
                    lastUniqueMessage = it
                    lastUniqueParsedValue = parsedValue
                    return@takeWhile false
                }
            }
            return@takeWhile true
        }.collect()
        return lastUniqueParsedValue
    }
2 Likes

Thanks, this indeed helps. I was aware that the map call was “cold”, but I thought buffer would be running, since the documentation says:

The buffer operator creates a separate coroutine during execution for the flow it applies to.

In any case, this is a good solution. I think I’d still prefer to use first rather than collect, since with the former, I can directly return a value.

Out of curiosity, is it possible to somehow place lastUniqueMessage and lastUniqueParsedValue inside a flow operator? That way, those states would not have to be explicitely kept around. I’m still learning about flows, and the ability to store state like that is something I don’t know about.

1 Like

“during execution for the flow it applies to“ means inside collect. Only the terminal operators (they don’t return a Flow) and the sharing operators (take CoroutineScope) actually do anything immediately.

If I wanted to bundle that pattern up, I suppose I might stick that code inside a Flow extension method that also takes a CoroutineScope parameter and returns a () -> ParsedValue (or fun interface equivalent) where getNextUniqueValue becomes the returned lambda.

Actually I do have one more question to this code. The documentation states that collect() never finishes normally when used on a hot flow. This then means that getNextUniqueValue never finishes unless the coroutine is cancelled, does it?

That applies to myFlow, not the Flow that takeWhile returns. When the predicate returns false, the internal collection on myFlow is terminated using an exception.

Ahh, right - I misread the takeWhile documentation. It states:

Returns a flow that contains first elements satisfying the given predicate.

And I somehow missed the “first” bit, making it seem as if takeWhile sort of behaved like filter.