Coroutine Producer Cancellation

I have problems understanding the cancellation behavior with blocking APIs. Let me start with an easy non-blocking example that I do understand:

fun execute(job: Job) = produce<String>(CommonPool + job) {
    println("a")
    send("Start")
    println("b")
    delay(1000L) // <--- will exit here
    println("c")
    send("Delay done")
    println("d")
    send("Task done")
    println("e")
}

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = Job()
    execute(job).consumeEach {
        println(it)
        job.cancel()
    }
}

This prints:

a
b
Start

Which is what I expect. I often have to use blocking APIs though, so let’s replace delay with Thread.sleep in the above example. This will print:

a
b
Start
c
d
Delay done

As expected, the blocking Thread.sleep can’t be cancelled which is fine. But why does it execute send("Delay done") and event print d?

I’d expect that the send would not execute and that the console would look like this:

a
b
Start
c

What is the intention behind this behavior?

1 Like

This is an intended behavior. Suspending functions in kotlinx.coroutines are cancellable only when they do suspend. In your second example consumer was already ready to receive an element so this send invocation did execute without suspension and thus ignored the cancellation, however the subsequent attempt to send had to suspend and thus got cancelled.

This behavior is document as a part of send function documentation: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/send.html

This suspending function is cancellable. If the Job of the current coroutine is cancelled or completed while this function is suspended, this function immediately resumes with CancellationException.

Conceptually, cancellation is a cooperative behavior. Coroutines choose the points at which they can be cancelled and the default implementation is to check for cancellation at suspension only. This is mostly done for performance reasons to avoid extra “am I cancelled” checks in “fast path” when there is no suspension.

I see. I was aware of cancellation being cooperative but thought I could rely on send in supporting me always in this regard. But – if I understood you correctly – this is not the case if a “fast path” without actual suspension is chosen.

This means that cancellation behaviour might change due to a runtime optimization, right? My first impression is that this might be a bit confusing. My first reaction is that I won’t be relying on any suspension point to catch cancellation automatically anymore…

I’m not sure if Kotlin should either check for cancellation always or never to make this conceptually simpler.

Anyways, thanks for the help. Will think about it some more.

This behavior cannot easily change, as it is the part of send’s specification and there are quite a few tests that actually check that it precisely adheres to it.

I understand your position but this is problematic from a user’s perspective.

Usually, suspension points are good candidates for potential “cancellation exits”. I can’t rely on them, though. To simplify the understanding of my own code I’m considering something along these lines:

// A convenience extension function
fun Job.throwIfCancelled() {
    if (this.isCancelled) {
        throw CancellationException()
    }
}

fun execute(job: Job) = produce<String>(CommonPool) { // Note: Job not part of context
    println("a")
    send("Start")
    job.throwIfCancelled() // <-- Check
    println("b")
    Thread.sleep(1000L)
    println("c")
    job.throwIfCancelled() // <-- Check
    send("Delay done")
    println("d")
    job.throwIfCancelled() // <-- Check
    send("Task done")
    println("e")
}

Reading the code above I know exactly when this might cancel. Note, that the behaviour is also slightly different because the CancellationException won’t be caught by the producer. I find this useful though because the consumer usually has to react to this.

I’m still new to these coroutines so I’d be interested in how other people do this.