Why are downstream exceptions propagated upwards in Flow?

I’m reading the guide on Flow and it says,

Flows must be transparent to exceptions and it is a violation of the exception transparency to emit values in the flow { ... } builder from inside of a try/catch block. This guarantees that a collector throwing an exception can always catch it using try/catch as in the previous example.

This is a bit hard to understand, but if you actually do try use try/catch in emitter you can see the issue (play.kotlinlang):

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class FunnyException : Exception()

fun main() = runBlocking<Unit> {
    try {
        flow { 
            emit(0)
            try {
                emit(1)
            } catch (e: FunnyException) {
                emit(2)
            }
        }.onEach {
            if (it == 1) throw FunnyException()
        }.collect {
            println(it)
        }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}

This produces:

0
Caught java.lang.IllegalStateException: Flow exception transparency is violated:
    Previous 'emit' call has thrown exception FunnyException, but then emission attempt of value '2' has been detected.
    Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
    For a more detailed explanation, please refer to Flow documentation.

I guess in this particular piece of code it makes sense that the emitter shouldn’t try to emit after it receives an exception. What I don’t see is, why emitter receives FunnyException in the first place? What useful things can it do with this exception?

I expected that the exception thrown in onEach is propagated downwards instead, and CancellationException is thrown inside the flow {} block. Instead, that block can either produce the above error or it can completely suppress the exception (try commenting out emit(2)). I this case the program only prints “0”. This is not disallowed even though it seems Kotlin could detect this; what would be the purpose of such code? Doesn’t this also break the “exception transparency”?

P.S. Why is CancellationException an Exception? Wouldn’t it make sense to extend directly from Throwable so that it is not inadvertedly caught by catch (e: Exception)?

Basically this just means that if emit fails with an exception, then the whole collection should fail with that same exception.

Exceptions always propagate up to the caller. It’s the signal that the requested code failed. That exception is indicating that emit failed. This happens because emit eventually calls the onEach lambda that throws the exception.

The guide is saying that you aren’t allowed to do anything with the exception.

You can only throw an exception up to the caller. There’s not really another direction to throw it. If emit fails, the goal is to pass that exception (not some different exception) up to the code calling collect.

Flow is just a regular Kotlin class defined in a Kotlin library. It can’t do anything special. The IDE could potentially detect this case and give a warning. You could file an Issue.

Because Flow isn’t special and Kotlin has no way to require a lambda or method to be transparent to exceptions, it’s up to you to not break exception transparency.

CancellationException comes straight from Java, it’s not unique to coroutines.

2 Likes

Aha, thanks, I think I’m getting it. Correct me if I’m wrong here:

So I was thinking about Flow in terms of Python’s asynchronous generators. In Python, you would do something like

>>> async def emitter():
...     yield 1
...     yield 2
...
>>> async for x in emitter():
...     print(x)
1
2

Kotlin’s Flow looks similar, but unlike sequence {}, it does not involve iterators

flow {
    emit(1)
    emit(2)
}.collect {
    println(it)
}

The key difference here is that in Python the steering wheel remains in the top level code, while in Kotlin it’s passed to the Flow block. In other words, in Python the part that is in charge is the async for loop; it requests stuff from emitter and then passes it to the body. In Kotlin, on the other hand, running collect will lauch the flow block that calls emit methods which are calling the downstream methods in a more or less direct way.

This allows Python to catch exception in a more direct way, you can simply do like

try:
    await anext(emitter_obj)
except MyException:
    ...

But in Kotlin this code would’t be in charge. What you can do instead is wrap the whole flow {}.collect {} in a try/catch, which is what .catch {} is doing. It has to check that the exception doesn’t come from downstream so it’s a bit more complicated than that.

CancellationException comes straight from Java, it’s not unique to coroutines.

Why not make a new thing?

Haven’t used coroutines in Python but that reminds me of use cases that rely on Channels.

With a Channel, you can receive just the next item at a time. Exceptions work more like you were expecting since the code producing values is just adding to the Channel instead of invoking the consumer code directly.

So why would anyone use Flow when there’s Channel?

Flow is more versatile (it waits to run until you collect), more efficient (direct calls are faster than communicating with a Channel), and simpler to use (nothing to close unlike a Channel).

If needed, any Flow can be consumed with a Channel by calling produceIn. Or viceversa, with asFlow.

Flow is generally preferred for cases representing, manipulating, and combining streams of data.

Note that even with Channel, I think the semantics will be different that you are used to. Channel is an async queue, not an async iterator. The producer doesn’t wait for the value to be requested, it starts right away.

I’m not aware of an actual async iterator in Kotlin’s coroutine library though I can’t really think of a use case where it’d be preferable, so I’m not surprised.

I imagine the Kotlin team saw the benefit of using the existing exception as better than any derived from creating some new second exception for the basically the same exact purpose.

Channels are still not quite the same, as the producer still has control, doesn’t it? If the consumer of the channel takes 3 elements, the producer will go ahead and make the fourth one. It will stop after, but that’s not quite what we want with lazy iterators…

…So for the sake of learning this whole business I went ahead and tried implementing an asynchronous sequence. My idea was to make it behave exactly like sequence {}:

runBlocking {
    val seq = asyncSequence {
        yield(1)
        delay(100)
        yield(2)
    }

    for (i in seq) println(i)
}

(This is just a proof of concept made for fun. In all likelihood not thread-safe. This has one cool feature: if the generator is hot, the finally block after yield is always executed!)

While it works, asyncSequence is rather useless. It produces an “iterator” that’s suitable for the for loop, but it doesn’t implement the actual Iterator interface because of the suspended functions, so the for loop is about the only place where it can be used. But I’m still proud of this.

the code behind asyncSequence (native):

package asynciterator

import kotlinx.coroutines.*
import kotlin.coroutines.intrinsics.createCoroutineUnintercepted
import kotlin.coroutines.*
import kotlin.experimental.ExperimentalTypeInference
import kotlin.system.getTimeMillis


interface AsyncCoroutineScope<T> : CoroutineScope {
    suspend fun yield(value: T)
}


interface AsyncIterator<T> {
    operator fun iterator() = this
    suspend operator fun hasNext(): Boolean
    suspend operator fun next(): T
}


private sealed class Status<T>
private class NotReady<T> : Status<T>()
private class Ready<T>(val value: T) : Status<T>()
private class Done<T> : Status<T>()
private class Failed<T>(val exception: Throwable) : Status<T>()


class AsyncIteratorImpl<T>(
    override val coroutineContext: CoroutineContext
) : AsyncIterator<T>, AsyncCoroutineScope<T>, Continuation<Unit> {
    private lateinit var mainContinuation: Continuation<Status<T>>
    internal lateinit var sequenceContinuation: Continuation<Unit>

    private var status: Status<T> = NotReady()

    override operator fun iterator() = this

    private suspend fun ensureNext() {
        if (status is NotReady) {
            status = suspendCoroutine {
                mainContinuation = it
                sequenceContinuation.resume(Unit)
            }
            status.let { if (it is Failed) throw it.exception }
        }
    }

    private fun throwBadStatus(): Nothing {
        when (val status = status) {
            is Done -> throw NoSuchElementException("Asynchronous iterator is exhausted")
            is Failed -> throw IllegalStateException("Asynchronous sequence block previously threw an exception", status.exception)
            else -> throw IllegalStateException("This ain't possible")
        }
    }

    override suspend operator fun hasNext(): Boolean {
        ensureNext()
        if (status is Done) return false
        if (status is Ready) return true
        throwBadStatus()
    }

    override suspend operator fun next(): T {
        ensureNext()
        status.let { if (it is Ready) return it.value.also { status = NotReady() } }
        throwBadStatus()
    }

    override suspend fun yield(value: T) = suspendCancellableCoroutine<Unit> {
        sequenceContinuation = it
        mainContinuation.resume(Ready(value))
    }

    // the following is completion continuation stuff. resumeWith here will be called when the coroutine
    // either run to the very end or throws an exception; this includes CancellationException
    override val context = coroutineContext

    override fun resumeWith(result: Result<Unit>) {
        val exception = result.exceptionOrNull()
        if (exception is CancellationException) return
        mainContinuation.resume(if (exception != null) Failed(exception) else Done())
    }
}


@OptIn(ExperimentalTypeInference::class)
fun <T> CoroutineScope.asyncSequence(@BuilderInference block: suspend AsyncCoroutineScope<T>.() -> Unit): AsyncIterator<T> {
    return AsyncIteratorImpl<T>(coroutineContext).apply {
        sequenceContinuation = block.createCoroutineUnintercepted(this, this)
    }
}

a few tests:

@file:Suppress("ControlFlowWithEmptyBody")

package asynciterator

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlin.math.absoluteValue
import kotlin.system.measureTimeMillis
import kotlin.test.Test
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue


fun CoroutineScope.makeSequence(delayDuration: Long = 0) = asyncSequence {
    for (x in 1..5) {
        delay(delayDuration)
        yield(x)
    }
}


@Suppress("DIVISION_BY_ZERO")
fun CoroutineScope.makeFailingSequence() = asyncSequence {
    for (x in makeSequence()) yield(x)
    1 / 0
}


@Test fun sum() = runBlocking {
    var sum = 0
    for (x in makeSequence()) sum += x
    assertTrue { sum == 15 }
}

@Test fun concurrency() {
    val single = measureTimeMillis {
        runBlocking {
            launch { for (x in makeSequence(100)) {} }
        }
    }

    val double = measureTimeMillis {
        runBlocking {
            launch { for (x in makeSequence(100)) {} }
            launch { for (x in makeSequence(100)) {} }
        }
    }

    assertTrue { (double - single).absoluteValue / double < 0.1 }
}

@Test fun throws() = runBlocking<Unit> {
    assertFailsWith(ArithmeticException::class) {
        for (x in makeFailingSequence()) {}
    }
}

@Test fun `throws if next() is called again`() = runBlocking<Unit> {
    val seq = makeFailingSequence()

    assertFailsWith(ArithmeticException::class) {
        for (x in seq) {}
    }

    val e = assertFailsWith(IllegalStateException::class) {
        for (x in seq) {}
    }

    assertTrue { e.cause is ArithmeticException }
}

@Test fun `doesn't automatically proceed after yield()`() = runBlocking<Unit> {
    for (x in makeFailingSequence()) {
        if (x == 5) break
    }
}

@Test fun `runs finalizer`() {
    var finalizerRun = false

    runBlocking {
        val seq = asyncSequence {
            try {
                for (x in 1..5) yield(x)
            } finally {
                finalizerRun = true
            }
        }

        for (x in seq) {
            if (x == 3) break
        }
    }

    assertTrue { finalizerRun }
}