Feature Request: MutableStateFlow - support equalsPolicy{}

saying I have a data class MyHeavyDataClass(...) “heavy” data class with multiple fields / nested fields / anything that would cause MyHeavyDataClass::equals() to run in non-trivial perfomance-hitting duration.
now let’s say I (as the feature developer) want to expose a StateFlow<MyHeavyDataClass> using MutableStateFlow under the hood.

Let’s also say that I know what I’m emitting, and want to exclude the == checks on the state flow.

currently the best thing I came up with was:

data class Wrapper(val uniqueId: Int, val data: MyHeavyDataClass)

private var uniqueIdGenerator = 0

// when emitting:
val someDataClassInstance = ... // compute it
_stateFlow.value = Wrapper(uniqueIdGenerator++, someDataClassInstance)

using the uniqueId field as breaker for the .equals() so the state flow won’t get into calculating .equals for the actual heavy data class.

this adds complexity to both the emitting code (need to syncronize uniquness) and collecting code (need to access extra field .data)


Ideally I would like something like android-jetpack-compose added to their State class (which is basically an observable with getter for current value, very similar to StateFlow)

something like this:

fun <T> MutableStateFlow(firstValue: T, equalsPolicy: (T, T) -> Boolean = T::equals)

then I can do custom equality checks from the creator the the StateFlow instance, so I can control the distinctiveness of the stateFlow’s emits.
in my example would just say

val stateFlow = MutableStateFlow(..., equalsPolicy = {_,_ -> false}) // never equals

but I can also see situations where ppl would use the equals policy to only look on fields like “uuid” or similar in situations where a given object with uuid is just a snapshot and same uuid filed on 2 instances would mean same content of data, and the new equalsPolicy{} can support such scenarios as well.

WDYT?

4 Likes

A StateFlow is a type of SharedFlow. Maybe you could use a SharedFlow instead?

It’s a possibility but I really like the conveniency of flow.value (in my feature there should always be a value and the feature - partly legacy code - needs sometimes to access the value immediately)

having a sharedFlow will solve the “don’t use .equals()” part but will leave me without .value and without non-suspending way to get the latest value, leaving me to need to synchronise (atomically?) the latest flow’s emit and a dedicated field for getting that emit

I agree, similarly to sorting algorithms that allow to provide custom comparators, optimally we should be able to provide our own “equalators” if some mechanism compares data for equality.

As MutableStateFlow is an interface and not that complex, we can try to create our own implementation - either by copying the code from the original implementation or by delegating. For delegating we could use an item wrapper class with overriden equals() and then wrap/unwrap items when reading/writing. Still, not that straightforward to do as we have multiple functions to override, but not that bad neither:

fun <T> MutableStateFlow<T>.withComparator(comparator: (T, T) -> Boolean) : MutableStateFlow<T> {
    @Suppress("EqualsOrHashCode", "UNCHECKED_CAST")
    class Wrapper(val wrapped: T) {
        override fun equals(other: Any?) = comparator(wrapped, (other as Wrapper).wrapped)
    }
    val flow = MutableStateFlow(Wrapper(value))

    return object : MutableStateFlow<T> {
        override var value
            get() = flow.value.wrapped
            set(value) {
                flow.value = Wrapper(value)
            }

        override suspend fun collect(collector: FlowCollector<T>) = flow.collect { collector.emit(it.wrapped) }
        override val replayCache get() = flow.replayCache.map { it.wrapped }
        override val subscriptionCount get() = flow.subscriptionCount
        override fun compareAndSet(expect: T, update: T) = flow.compareAndSet(Wrapper(expect), Wrapper(update))
        @ExperimentalCoroutinesApi
        override fun resetReplayCache() = flow.resetReplayCache()
        override fun tryEmit(value: T) = flow.tryEmit(Wrapper(value))
        override suspend fun emit(value: T) = flow.emit(Wrapper(value))
    }
}

Test:

fun main() = runBlocking {
    val flow = MutableStateFlow(MyHeavyDataClass())
        .withComparator { _, _ -> false }

    launch {
        flow.take(3).collect { println("Collected!") }
    }

    delay(1000)
    flow.value = MyHeavyDataClass()
    delay(1000)
    flow.value = MyHeavyDataClass()
}

class MyHeavyDataClass {
    override fun equals(other: Any?): Boolean {
        println("Oh no!")
        Thread.sleep(10000)
        return false
    }
}

It feels a bit hacky, but I think it should work properly for most/all cases.

1 Like

thanks for the reply!

I didn’t think to try wrapping MutableStateFlow as in the official documentation they said “Not stable for inheritance - The MutableStateFlow interface is not stable for inheritance in 3rd party libraries, as new methods might be added to this interface in the future, but is stable for use.”
(MutableStateFlow) and I understand this - my library can be stuck importing i.e. kotlinx.coroutines version X while the consumer app imports kotlinx.coroutines version X+something, leaving the consumer app with runtime crashes with UnimplementedOperationExceptions (or worse, never tried that)

so ideally the best would be language support.

but for the meantime I can do something similar since I’m writing a consumer app not a library, so thanks a lot!

Ahh, actually I looked for a similar comment, because it feels wrong to override all these functions like resetReplayCache(), etc. Somehow, I missed it. Yeah, I agree, that could be a pain, especially if used in a library.

I like the idea, it gives customizability that is needed in some circumstances. I think it’s worth opening a feature request for this.

An additional note: although having an equalsPolicy like you described (e.g. (T, T) -> Boolean) is good and necessary for full customizability, I think there should be another layer of customizability in between. As is a common pattern throughout the Kotlin stdlib and third-party libs, there are often convenience functions which use a (monadic) selector lambda instead of a (dyadic) policy lambda. The sorting functions of the stdlib were already mentioned in the thread, but note how sortedBy takes a selector ((T) -> K) and not a Comparator (that would be sortedWith). Comparison or equality is then delegated to the selected key.

Similarly, I think the MutableSharedFlow should provide a way to customize equality via a selector lambda, as it covers Most use-cases, and additionally via a policy lambda for special use-cases.

1 Like

Selector can be easily implemented using the comparator, so comparator could be the “main” API implemented by the flow itself and then we can provide selector API either by extensions or by something like: MutableStateFlow(user, compareBy { it.name }).

thanks a lot! how do one (me) opens a feature request? :pray: