Flows and mutable state

From the official documentation Shared mutable state and concurrency I understood that kotlin flows should not be used together with mutable variables - because they’re not thread-safe.

But looking inside the standard library source code I can see that vars are used with flows:

private inline fun <T, K> Flow<T>.distinctUntilChangedBy(
    crossinline keySelector: (T) -> K,
    crossinline areEquivalent: (old: K, new: K) -> Boolean
): Flow<T> =
    flow {
        var previousKey: Any? = NULL
        collect { value ->
            val key = keySelector(value)
            @Suppress("UNCHECKED_CAST")
            if (previousKey === NULL || !areEquivalent(previousKey as K, key)) {
                previousKey = key
                emit(value)
            }
        }
    }

So where is the truth? Which Flow functions allows me to use var or other not thread-safe data structures?

That documentation starts out with “Coroutines can be executed concurrently”. Multiple coroutines running concurrently is the only scenario it’s taking about. The var in distinctUntilChangedBy is only ever accessed by one coroutine. The code that touches it all runs sequencially. collect processes items one at a time so the var is never touched by multiple threads at the same time.

2 Likes

And if the coroutine is resumed on different threads it won’t cause any trouble? Eg. memory barriers are inserted by compiler?

It’s still one coroutine, so there’s no race conditions there. It isn’t shared mutable state. It’s mutable state that can only be accessed by one execution of code

It’s mutable state that can only be accessed by one execution of code

Yeah, but the coroutine can be still executed on different threads which could lead to problems - eg. something has to ensure that cpu caches are properly flushed to main memory - otherwise each thread could see different data.

No, there won’t be any problems. The stack of a coroutine is restored on each invocation. I.e. the local variables of a coroutine are not stored in the heap, but on the stack of the current thread.

1 Like

I though that coroutine is compiled as a class and coroutine’s local variables become instance variables of that class + there is an instance variable which says at which suspension point was coroutine suspended?

I do not know the exact implementation details, but there is a class involved somewhere of course.

The point is that the coroutine framework takes care of thread synchronization, and you can use local variables as local variables. You do not have to worry about synchronization issues when writing a coroutine. That is what makes them powerful: they are easy to reason about.

1 Like

That would make sense. But to be honest I would be much happier if it was somehow written in the documentation and/or explained in terms of Java memory model.

I would be thankful if somebody could add such reference.

1 Like

You could take a look at this KEEP:

Just a small warning, this was the design document for the implementation of coroutines. There might be a few minor changes to coroutines since this was written. However the basic idea of how coroutines work internally should still be the same.

1 Like

Thanks - part Concurrency and threads answers my question

It would be nice if the official tutorial/documentation (https://kotlinlang.org/docs/reference/coroutines/basics.html) mention var and mutable data structures as well.

You can also read this TL;DR. Hopefully that’ll dispel some common myths about concurrent access to mutable state. What is “concurrent” access to mutable state? | by Roman Elizarov | ProAndroidDev

1 Like