Close SubscriptionReceiveChannel inside consumeEach


#1

I have an NPE when closing a SubscriptionReceiveChannel of a BroadcastChannel inside a consumeEach block. I’m writing about this here because I don’t know if this is by design or if I misunderstand something.

Example:

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = BroadcastChannel<Int>(1)
    launch(CommonPool) {
        for (x in 1..5) {
            delay(100L)
            channel.send(x)
        }
        channel.close()
    }
    val sub = channel.openSubscription()
    sub.consumeEach {
        println(it)
        if (it == 2) {
            sub.close()
            // Adding return@consumeEach here doesn't help
        }
    }
    println("Done!")
}

A workaround I found is to use for..in with an explicit break statement. In the example above replacing the consumeEach:

for (value in sub) {
    println(value)
    if (value == 2) {
        sub.close()
        break // will get an NPE as well without this line
    }
}    

Is this a known issue? Is this behavior fine? I can understand the need for a break statement in the for…in solution. consumeEach should check whether a channel is closed though, or not?


#2

Thanks for the report. Please, file this as an issue under kotlinx.coroutines github project: https://github.com/Kotlin/kotlinx.coroutines/issues


#3

Done, Issue on Github