I have an NPE when closing a SubscriptionReceiveChannel of a BroadcastChannel inside a consumeEach block. I’m writing about this here because I don’t know if this is by design or if I misunderstand something.
Example:
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = BroadcastChannel<Int>(1)
launch(CommonPool) {
for (x in 1..5) {
delay(100L)
channel.send(x)
}
channel.close()
}
val sub = channel.openSubscription()
sub.consumeEach {
println(it)
if (it == 2) {
sub.close()
// Adding return@consumeEach here doesn't help
}
}
println("Done!")
}
A workaround I found is to use for..in
with an explicit break
statement. In the example above replacing the consumeEach:
for (value in sub) {
println(value)
if (value == 2) {
sub.close()
break // will get an NPE as well without this line
}
}
Is this a known issue? Is this behavior fine? I can understand the need for a break statement in the for…in solution. consumeEach
should check whether a channel is closed though, or not?