Potential value loss in Kotlin's SharedFlow between subscriptions

Hi everyone,

I’ve encountered an issue with Kotlin’s SharedFlow where it seems to lose values between subscriptions. I’ve created a minimal reproducible example to illustrate the problem. In this example, there is a flow that produces values from 10 down to 0, and it is shared with shareIn function.

Here is the code:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import java.util.concurrent.atomic.AtomicInteger

fun main() {
    runBlocking {
        val context = newSingleThreadContext("BlockingCodeContext")
        val counter = AtomicInteger(10)

        val flow = flow {
            while (currentCoroutineContext().isActive) {
                val value = async(context) {
                    Thread.sleep(500)
                    counter.getAndDecrement().also {
                        println("Produced $it")
                    }
                }.await()
                if (value < 0) break
                println("Emitting $value")
                emit(value)
            }
        }

        val scope = CoroutineScope(context)
        val shared = flow.shareIn(scope, SharingStarted.WhileSubscribed())

        shared.take(1).collect {
            println("Consumed $it")
        }

        shared.takeWhile { it > 0 }.collect {
            println("Consumed $it")
        }

        context.close()
        scope.cancel()
    }
}

Here is what it prints on my laptop:

Produced 10
Emitting 10
Consumed 10
Produced 9
Emitting 9
Consumed 9
Produced 8
Produced 7
Emitting 7
Consumed 7
Produced 6
Emitting 6
Consumed 6
Produced 5
Emitting 5
Consumed 5
Produced 4
Emitting 4
Consumed 4
Produced 3
Emitting 3
Consumed 3
Produced 2
Emitting 2
Consumed 2
Produced 1
Emitting 1
Consumed 1
Produced 0
Emitting 0
Produced -1

Process finished with exit code 0

In the output, the value 8 is produced but not emitted or consumed. I believe this happens because the coroutine responsible for sharing is cancelled, causing the flow to not proceed after the await() call.

Could someone advise on how to ensure that all values are properly shared between subscriptions?

I’m relatively new to coroutines, so if there’s a better overall approach to achieve similar to SharedFlow behavior, I’m all for it. Any guidance would be greatly appreciated!

Thanks in advance for your help!

It is hard to recommend you anything as it is not clear what you try to achieve. You stop observing the flow for a moment, so you stop the producer.

If you need to coordinate some work between multiple consumers, then I think flows aren’t really suitable here. Use channels instead.