Hi everyone,
I’ve encountered an issue with Kotlin’s SharedFlow where it seems to lose values between subscriptions. I’ve created a minimal reproducible example to illustrate the problem. In this example, there is a flow that produces values from 10 down to 0, and it is shared with shareIn function.
Here is the code:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import java.util.concurrent.atomic.AtomicInteger
fun main() {
runBlocking {
val context = newSingleThreadContext("BlockingCodeContext")
val counter = AtomicInteger(10)
val flow = flow {
while (currentCoroutineContext().isActive) {
val value = async(context) {
Thread.sleep(500)
counter.getAndDecrement().also {
println("Produced $it")
}
}.await()
if (value < 0) break
println("Emitting $value")
emit(value)
}
}
val scope = CoroutineScope(context)
val shared = flow.shareIn(scope, SharingStarted.WhileSubscribed())
shared.take(1).collect {
println("Consumed $it")
}
shared.takeWhile { it > 0 }.collect {
println("Consumed $it")
}
context.close()
scope.cancel()
}
}
Here is what it prints on my laptop:
Produced 10
Emitting 10
Consumed 10
Produced 9
Emitting 9
Consumed 9
Produced 8
Produced 7
Emitting 7
Consumed 7
Produced 6
Emitting 6
Consumed 6
Produced 5
Emitting 5
Consumed 5
Produced 4
Emitting 4
Consumed 4
Produced 3
Emitting 3
Consumed 3
Produced 2
Emitting 2
Consumed 2
Produced 1
Emitting 1
Consumed 1
Produced 0
Emitting 0
Produced -1
Process finished with exit code 0
In the output, the value 8 is produced but not emitted or consumed. I believe this happens because the coroutine responsible for sharing is cancelled, causing the flow to not proceed after the await()
call.
Could someone advise on how to ensure that all values are properly shared between subscriptions?
I’m relatively new to coroutines, so if there’s a better overall approach to achieve similar to SharedFlow behavior, I’m all for it. Any guidance would be greatly appreciated!
Thanks in advance for your help!