Misleading coroutine behavior

For those who do not wish to spend time reading all thread here is a small recap:

First point:

Coroutine API suspendCoroutine function can’t be resumed immediately after calling its continuation.resume method up until it finishes execution of its body.

Second point:

ContinuationInterceptor doesn’t intercept a continuation of a suspension that has been resumed in the thread it had been started.

The original question was the following:


In the following sample suspendCoroutine function (Suspension) in exec2 sequentially executes its body code within some Coroutine exec1. But at some moment before Suspension execution is finished other code from another thread calls this Suspension continuation.resume. What’s expected is the Suspension to be resumed as soon as its continuation.resume is called. Thereby resuming the Coroutine at the same moment. Although what’s observed is the Suspension continues execution of its body and resumes Coroutine only after it’s done.

fun main(args: Array<String>) {
	exec1 {
		exec2()
	}
}

fun exec1(code: suspend () -> Unit) {
	code.startCoroutine(object : Continuation<Unit>{
		override val context=EmptyCoroutineContext
		override fun resume(value: Unit) = println("exec1 resume")
		override fun resumeWithException(exception: Throwable) = println("exec1 resumeWithException $exception")
	})
}

suspend fun exec2(): Unit = suspendCoroutine { cont->
	println("exec2 started")
	thread {
		println("exec2 canceled")
		cont.resumeWithException(Exception("oops"))
	}
	Thread.sleep(1000)
	println("exec2 finished")
//	cont.resume(Unit)
}

This prints:

exec2 started
exec2 canceled
exec2 finished
exec1 resumeWithException java.lang.Exception: oops

But expected is:

exec2 started
exec2 canceled
exec1 resumeWithException java.lang.Exception: oops
exec2 finished

Why? Because a code within a suspendCoroutine function can be another long-running Coroutine which can decide by itself whether to run its code in current or parallel thread. And as long as it’s the long-running code it may be liable to cancellation at some point before it’s finished. Although with such Suspension behavior it’s impossible to actually cancel its body execution when it’s running in the thread it had been started in.
May be authors have a strong reason for that, which i’d like to know. Otherwise why not make a Suspension resumable at the moment of resumption not the moment it’s done executing its body.


The second point:

In the following sample i’m trying to track current Coroutine by intercepting it. Interceptor intercepts resumption after suspension only when Interceptor jumps to another thread (runner is not null). Otherwise (runner is null) if coroutine is going to be resumed after suspension in the same thread, Interceptor does not intercept resumption. Which make Interceptor useful only for jumping to threads but useless as the the general tool for actual interception of resumptions.

fun main(args: Array<String>) {
	coroutine {
		suspend { 1 }
	}
	runner?.awaitTermination(1, TimeUnit.SECONDS)
	runner?.shutdown()
}

val runner: ExecutorService? = /*null*/Executors.newFixedThreadPool(2)
var IDS = 0
val tracker = Tracker()

fun coroutine(code: suspend () -> Unit) {
	code.startCoroutine(Coroutine(IDS++, null))
}

suspend fun <T> suspend(code: suspend () -> T): T = suspendCoroutine { cont ->
	code.startCoroutine(Coroutine(IDS++, cont))
}

class Coroutine<T>(val id: Int, val cont: Continuation<T>?): Continuation<T> {
	override val context = tracker + Interceptor()
	override fun resume(value: T) {
		println("resume $this:  current: ${tracker.current}")
		cont?.resume(value)
	}
	override fun resumeWithException(exception: Throwable) {
		println("resumeWE $this:  current: ${tracker.current}")
		cont?.resumeWithException(exception)
	}
	override fun toString() = "[Coroutine $id]"
	
	inner class Interceptor: AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
		override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = object: Continuation<T> {
			override val context = continuation.context
			override fun resume(value: T) {
				tracker.intercept(this@Coroutine)
				runner?.execute { continuation.resume(value) } ?: continuation.resume(value)
			}
			override fun resumeWithException(exception: Throwable) {
				tracker.intercept(this@Coroutine)
				runner?.execute { continuation.resumeWithException(exception) } ?: continuation.resumeWithException(exception)
			}
		}
	}
}

class Tracker: AbstractCoroutineContextElement(Tracker) {
	companion object Key: CoroutineContext.Key<Tracker>
	
	var current: Coroutine<*>? = null
	fun intercept(c: Coroutine<*>) {
		current = c
		println("intercepted current: $current")
	}
}

Prints if runner is not null (each coroutine executes in its own thread):

intercepted current: [Coroutine 0]
intercepted current: [Coroutine 1]
resume [Coroutine 1]: current: [Coroutine 1]
intercepted current: [Coroutine 0]
resume [Coroutine 0]: current: [Coroutine 0]

State is consistent since current Coroutine corresponds to resumend one. See the last line.

Prints if runner is null (execution is in the same thread):

intercepted current: [Coroutine 0]
intercepted current: [Coroutine 1]
resume [Coroutine 1]: current: [Coroutine 1]
resume [Coroutine 0]: current: [Coroutine 1]

State is NOT consistent since current Coroutine does not correspond to resumend one. See the last line. And note the absence of line 4 from previous print.

Why not make Interceptor intercept every resumption, not only those which occurred in another thread?

2 Likes

suspendCoroutine is a low-level primitive that is designed for conversion of callbacks into suspending functions. You should not use suspendCoroutine in your high-level code, precisely because invocation of suspendCoroutine exposes coroutine implementation details (a Continuation object) and can break the illusion of sequential execution in the way you’ve shown.

In particular, you should not mix your business-logic with suspendCoroutine invocations. Your high-level code should never work with Continuation objects directly. Continuation is exposed in Kotlin via suspendCoroutine only for the purpose of implementing low-level suspending functions.

Let us rewrite exec2 from your first example with this principle in mind:

suspend fun exec2() {
    println("exec2 started")
    exec2Cancel()
    Thread.sleep(1000)
    println("exec2 finished")
}

suspend fun exec2Cancel() = suspendCoroutine<Unit> { cont ->
    thread {
        println("exec2 canceled")
        cont.resumeWithException(Exception("oops"))
    }
}

Now exec2 does not use suspendCoroutine and it indeed executes sequentially just as it is supposed to. In particular, because exec2Cancel suspending function asynchronously completes with exception, the invocation of exec2 also completes with exception, never reaching “exec2 finished” line.

Because a code within a suspendCoroutine function can be another long-running Coroutine which can decide by itself whether to run its code in current or parallel thread

No. Code in suspendCoroutine cannot be a coroutine itself. The block of code that suspendCoroutine accepts is not a suspending function. Basically, the invocation of suspendCoroutine breaks out of coroutine into a regular non-suspending code where you are completely on your own in managing your asynchrony and concurrency via call-backs and other means.

Sorry, but I did not quite got what you were trying to achieve with your second example. Can you elaborate, please?

2 Likes

I have read your answer attentively and seems our thoughts do not sound yet. So let me be more specific.
Suppose i’m developing a lib based on Coroutines API. And i’m implementing a cancellable coroutine of my own.
Now let’s look at the code sample below. Here are functions runCoroutine and runSuspend that accept suspend code as parameter. Besides i want function runSuspend to be cancellable since it can execute long-running code. Let’s put aside the way it’s implemented here, and focus on the logic of the Coroutine API itself. The canceller is supposed to cancel current runSuspend body in the way the code in runCoroutine can continue execution after runSuspend. I expect it to be done as soon as canceller.cancel() is called. The executor instance (or its null value) here controls whether a continuation runs in the same or new thread.

fun main(args: Array<String>) {
	val canceller = Canceller()
	thread { Thread.sleep(500); canceller.cancel() }
	runCoroutine {
		println("$time   [Coroutine 0]  Before runSuspend")
		runSuspend(canceller) {
			// any long-running (e.g. suspendable) code here ...
			println("$time   [Coroutine 1]  Started")
			Thread.sleep(1000)
			println("$time   [Coroutine 1]  Finished")
		}
		println("$time   [Coroutine 0]  After runSuspend")
	}
	executor?.awaitTermination(2, TimeUnit.SECONDS)
	executor?.shutdown()
}

So when each intercepted continuation runs in a new thread (executor is NOT null) the output is:

0.0 sec [Coroutine 0] Before runSuspend
0.0 sec [Coroutine 1] Started
0.5 sec [Coroutine 1] Cancelled
0.5 sec [Coroutine 0] After runSuspend
0.5 sec [Coroutine 0] Complete
1.0 sec [Coroutine 1] Finished
1.0 sec [Coroutine 1] Complete

Which is exactly what i expec. Cancellation is called at 0.5 second. And at the same moment the [Coroutine 0] is continued and complete.

Although when each intercepted continuation runs in the same thread (executor is null) the output is:

0.0 sec [Coroutine 0] Before runSuspend
0.0 sec [Coroutine 1] Started
0.5 sec [Coroutine 1] Cancelled
1.0 sec [Coroutine 1] Finished
1.0 sec [Coroutine 1] Complete
1.0 sec [Coroutine 0] After runSuspend
1.0 sec [Coroutine 0] Complete

Which is not desired behavior since [Coroutine 0] continues and completes at the 1.0 second, after [Coroutine 1] completion but not right after cancellation. That is in fact because of the logic of implementation of the suspendCoroutine{ continuation -> ...} API function which in spite of calling its continuation.resume(...) holds the actual resumption up to the moment it’s done executing its body.
At that point i expect you may start convincing me to implement my code in other way, but again the question is what is the logic behind this implementation of the Coroutine API. And if there is no principle design contradiction why not review it. It seems sensible to be able to resume continuation of suspendCoroutine{ continuation -> ...} right at the moment its continuation.resume is called. Why wait its body to finish?

val executor: ExecutorService? = null
//val executor: ExecutorService? = Executors.newFixedThreadPool(4)
val startTime = System.currentTimeMillis()
val time get() = DecimalFormat("0.0 sec").format((System.currentTimeMillis() - startTime).toInt() / 1000f)

fun runCoroutine(code: suspend () -> Unit) {
	code.startCoroutine(MyCoroutine(0, null))
}

suspend fun <T> runSuspend(canceller: Canceller, code: suspend () -> T): Optional<T> = suspendCoroutine { cont ->
	val compl = MyCoroutine(1, cont)
	canceller.coroutine = compl
	code.startCoroutine(compl)
}

class Canceller {
	lateinit var coroutine: MyCoroutine<*>
	fun cancel() = coroutine.cancel()
}

class MyCoroutine<T>(val id: Int, val cont: Continuation<Optional<T>>?): Continuation<T> {
	override val context = Interceptor()
	private var complete  = false
	fun cancel() {
		if (complete) return else complete = true
		println("$time   $this  Cancelled")
		cont?.resume(Optional.empty())
	}
	override fun resume(value: T) {
		println("$time   $this  Complete")
		if (complete) return else complete = true
		cont?.resume(Optional.of(value))
	}
	override fun resumeWithException(exception: Throwable) {
		println("$time   $this  Complete with exception: $exception")
		if (complete) return else complete = true
		cont?.resume(Optional.empty())
	}
	override fun toString() = "[Coroutine $id]"
	
	inner class Interceptor: AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
		override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = object: Continuation<T> {
			override val context = continuation.context
			override fun resume(value: T) {
				executor?.execute { continuation.resume(value) } ?: continuation.resume(value)
			}
			override fun resumeWithException(exception: Throwable) {
				executor?.execute { continuation.resumeWithException(exception) } ?: continuation.resumeWithException(exception)
			}
		}
	}
}

Now about the second point.

In this slightly modified sample Canceller tries to track coroutine/suspension with help of Interceptor in order to cancel exactly that coroutine which is currently executing its code. In this case it should cancel [Coroutine 0] when it’s sleeping. Each time Interceptor intercepts continuation it passes the coroutine it belongs to to Canceller. So i expect the Canceller when its cancel() method is called, to cancel current coroutine (last intercepted).

fun main(args: Array<String>) {
	thread { Thread.sleep(500); canceller.cancel() }
	runCoroutine {
		println("$time   [Coroutine 0]  Before runSuspend;  current: ${canceller.current}")
		runSuspend {
			println("$time   [Coroutine 1]  Body;  current: ${canceller.current}")
		}
		println("$time   [Coroutine 0]  After runSuspend;  current: ${canceller.current}")
		Thread.sleep(1000)
		println("$time   [Coroutine 0]  After sleep;  current: ${canceller.current}")
	}
	executor?.awaitTermination(2, TimeUnit.SECONDS)
	executor?.shutdown()
}

So when each intercepted continuation runs in a new thread (executor is NOT null) the output is:

1: 0.0 sec Canceller: intercept current: [Coroutine 0]
2: 0.0 sec [Coroutine 0] Before runSuspend; current: [Coroutine 0]
3: 0.0 sec Canceller: intercept current: [Coroutine 1]
4: 0.0 sec [Coroutine 1] Body; current: [Coroutine 1]
5: 0.0 sec [Coroutine 1] Complete
6: 0.0 sec Canceller: intercept current: [Coroutine 0]
7: 0.0 sec [Coroutine 0] After runSuspend; current: [Coroutine 0]
8: 0.5 sec Canceller: Cancelling current: [Coroutine 0]
9: 0.5 sec [Coroutine 0] Cancelled
10: 1.0 sec [Coroutine 0] After sleep; current: [Coroutine 0]

Notice in line 7 [Coroutine 0] matches the current coroutine. And line 8, 9 show the cancelling coroutine is the same as actual current which is [Coroutine 0]. Which is what’s expected.

Although when each intercepted continuation runs in the same thread (executor is null) the output is:

1: 0.0 sec Canceller: intercept current: [Coroutine 0]
2: 0.0 sec [Coroutine 0] Before runSuspend; current: [Coroutine 0]
3: 0.0 sec Canceller: intercept current: [Coroutine 1]
4: 0.0 sec [Coroutine 1] Body; current: [Coroutine 1]
5: 0.0 sec [Coroutine 1] Complete
6: 0.0 sec [Coroutine 0] After runSuspend; current: [Coroutine 1]
7: 0.5 sec Canceller: Cancelling current: [Coroutine 1]
8: 1.0 sec [Coroutine 0] After sleep; current: [Coroutine 1]
9: 1.0 sec [Coroutine 0] Complete

Notice the absence of line 6 from prev printing. It’s because interceptor did not intercepted continuation of [Coroutine 0]. Consequently in line 6 of later printing actual coroutine is [Coroutine 0] but [Coroutine 1] is shown as current. And that’s why Canceller is trying to cancel [Coroutine 1] (see line 7, 8).
This shows that ContinuationInterceptor intercepts continuation only in case if Thread.currentThread isn’t the same that the ongoing suspension has been started from. This eliminates using Interceptor as the general tool for intercepting continuations, making it more like ThreadJumper but not actual Interceptor. So if there is no strong design consideration behind this decision why not review this logic? It seems sensible that Interceptor would intercept every continuation and decided itself whether to run it in the same thread or jump to another.

//val executor: ExecutorService? = null
val executor: ExecutorService? = Executors.newFixedThreadPool(4)
val canceller = Canceller()
var lineN = 0
val startTime = System.currentTimeMillis()
val time get() = "${++lineN}:   "+DecimalFormat("0.0 sec").format((System.currentTimeMillis() - startTime).toInt() / 1000f)

fun runCoroutine(code: suspend () -> Unit) {
	code.startCoroutine(MyCoroutine(0, null))
}

suspend fun <T> runSuspend(code: suspend () -> T): Optional<T> = suspendCoroutine { cont ->
	val compl = MyCoroutine(1, cont)
	code.startCoroutine(compl)
}

class MyCoroutine<T>(val id: Int, val cont: Continuation<Optional<T>>?): Continuation<T> {
	override val context = canceller + Interceptor()
	var complete  = false
	fun cancel() {
		if (complete) return else complete = true
		println("$time   $this  Cancelled")
		cont?.resume(Optional.empty())
	}
	override fun resume(value: T) {
		if (complete) return else complete = true
		println("$time   $this  Complete")
		cont?.resume(Optional.of(value))
	}
	override fun resumeWithException(exception: Throwable) {
		if (complete) return else complete = true
		println("$time   $this  Complete with exception: $exception")
		cont?.resume(Optional.empty())
	}
	override fun toString() = "[Coroutine $id]"
	
	inner class Interceptor: AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
		override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = object: Continuation<T> {
			override val context = continuation.context
			override fun resume(value: T) {
				canceller.intercept(this@MyCoroutine)
				executor?.execute { continuation.resume(value) } ?: continuation.resume(value)
			}
			override fun resumeWithException(exception: Throwable) {
				canceller.intercept(this@MyCoroutine)
				executor?.execute { continuation.resumeWithException(exception) } ?: continuation.resumeWithException(exception)
			}
		}
	}
}

class Canceller: AbstractCoroutineContextElement(Canceller) {
	companion object Key: CoroutineContext.Key<Canceller>
	
	var current: MyCoroutine<*>? = null
	val currentComplete get() = current?.complete ?: false
	fun cancel() {
		println("$time   Tracker: Cancelling current: $current")
		current?.cancel()
	}
	fun intercept(c: MyCoroutine<*>) {
		current = c
		println("$time   Tracker: intercept current: $current")
	}
}

The short answer is: that is the only conceivable way it could be implemented. There is simply no way to abort the execution of the body of suspendCoroutine at a moment when resume is invoked. Attempting to do so whould lead to all sorts of constency problems.

That is not the case. Interception logic has nothing to do with the current thread. You can have all your coroutines executed in the single thread and interceptor would still work.

The ContinuationInterceptor intercepts all suspending functions that do suspend execution by design. Suspending function suspends coroutine when the invocation of suspendCoroutine returns without invoking resume. Suspending functions that did not suspend the execution (that is, they invoke resume before returning) are not intercepted.

2 Likes

Thank you for intelligible answer. Although it’s a pity to know there is no way to implement suspendCoroutine in other way in regard to abortion of its body, at least the reason seems compelling. In regard to the Interceptor issue i still have a question.
The name of the class ContinuationInterceptor as well as, i hope, its concept is supposed to literally mean intercepting Continuations. I.e. any Continuation that it comes across. Than why this selectivity in whether suspendCoroutine returns with or without invoking resume? Why not just follow the logic?
Let’s look at the following sample. It’s the simplest illustration of the issue. The function runCoroutine starts coroutine with interceptor that is meant to intercept its continuations.

fun main(args: Array<String>) {
	runCoroutine {
		println("point 1")
		suspendCoroutine<Int> { cont -> cont.resume(1) }
		println("point 2")
		suspendCoroutine<Int> { cont -> thread { cont.resume(1) } }
		println("point 3")
	}
}

fun runCoroutine(code: suspend () -> Unit) {
	code.startCoroutine(MyCoroutine())
}

class MyCoroutine<T>: Continuation<T> {
	override val context = Interceptor()
	override fun resume(value: T) = println("complete")
	override fun resumeWithException(exception: Throwable) = println("complete with $exception")
}

class Interceptor: AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
	override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = object: Continuation<T> {
		override val context = continuation.context
		override fun resume(value: T) = run{ println("intercepted"); continuation.resume(value) }
		override fun resumeWithException(exception: Throwable) =run { println("intercepted"); continuation.resumeWithException(exception) }
	}
}

This prints:

intercepted
point 1
point 2
intercepted
point 3
complete

By the logic you’ve presented it should skip suspension after the point 1 and just intercept suspension after the point 2. Which it does according to the printing above. But according to more general logic both suspensions involve continuations to resume the main coroutine. The decompiled coroutine (below is the simplified one) confirms that each suspension is the case of continuation. Ergo it’s the subject for ContinuationInterceptor since it is the continuation interceptor and not the JustSuspensionResumedFromOtherThreadContinuationInterceptor. Doesn’t it seem reasonable?
And in cases like one i’ve presented in the previous response it is really essential.

public final Object doResume(@Nullable Object var1_1, @Nullable Throwable var2_2) {
    switch (var0.label) {
        case 0: {
            var3_4 = "point 1";
            System.out.println(var3_4);
            this.label = 1;
            cont = (Continuation)safe$iv;
            cont.resume((Object)1);
            v1 = safe$iv.getResult();
            if (v1 == var10_3) {
                return var10_3;
            }
            ** GOTO lbl26
        }
        case 1: {
            v1 = data;
            lbl26: // 2 sources:
            $continuation$iv = "point 2";
            System.out.println($continuation$iv);
            this.label = 2;
            cont = (Continuation)safe$iv;
            ThreadsKt.thread$default(new Function0<Unit>(cont){
                public final void invoke() {
                    this.cont.resume((Object)1);
                }
            });
            v3 = safe$iv.getResult();
            if (v3 == var10_3) {
                return var10_3;
            }
            ** GOTO lbl45
        }
        case 2: {
            v3 = data;
            lbl45: // 2 sources:
            $continuation$iv = "point 3";
            System.out.println($continuation$iv);
            return Unit.INSTANCE;
        }
    }
}

PS:
Correct me if i’m wrong but it seems the logic of the current implementation is grounded on the fact that it’s easier for a user to implement ContinuationInterceptor not carrying about the case when coroutine is resumed in the same thread as it had been suspended in. And because in this case there is no need to switch to another thread it may seem simpler just not to involve interceptor at all. In other words it is for the reason a user code would look like this:

class SimpleInterceptor: AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
	override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = object: Continuation<T> {
		override val context = continuation.context
		override fun resume(value: T) {
			executor.execute{ continuation.resume(value) }
		}
		override fun resumeWithException(exception: Throwable) {
			executor.execute { continuation.resumeWithException(exception) }
		}
	}
}

But is requires a user code a little to accommodate it to more general case so it would look like this:

class SimpleInterceptor: AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
	private var thread: Thread = Thread.currentThread()
	override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = object: Continuation<T> {
		override val context = continuation.context
		override fun resume(value: T) {
			if (thread == Thread.currentThread()) continuation.resume(value)
			else executor.execute{ continuation.resume(value) }
		}
		override fun resumeWithException(exception: Throwable) {
			if (thread == Thread.currentThread()) continuation.resumeWithException(exception)
			else executor.execute { continuation.resumeWithException(exception) }
		}
	}
}

But it doesn’t seem a big deal.

It is a similar story with ContinuationInterceptor. There is no way to implement in any other way, since interceptor wraps around the execution it has to do so at the point where execution stack is unrolled, but execution stack is unrolled only when coroutine does suspend its execution, so we can only intercept at the actual suspension points.

This is not a bug, but a feature, because interceptors are designed to aid in managing threads for coroutines and a coroutine can only resume in another thread if it did suspend in its current thread, so the way interceptors are implemented perfectly aligns with the corresponding use-case.

1 Like

Okay than. Although behavior (limitations) you’ve described above wouldn’t be redundant in the Coroutine API documentation. So please add it for clarity.
Especially misleading looks this example of Continuation interceptor from Informal description of Revision 3.2.

launch(CommonPool) {
    initialCode() // execution of initial code
    f1.await() // suspension point #1
    block1() // execution #1
    f2.await() // suspension point #2
    block2() // execution #2
}

with the following statement:

Continuation interceptor has an option to intercept and wrap the continuation that corresponds to the execution of initialCode, block1, and block2 from their resumption to the subsequent suspension points.

Since interceptor actually doesn’t have this option to intercept block1, and block2. Particularly in case if f1 or/and f2 have already finished their execution at the point of await call.

Thanks for feedback. I’ve clarified the wording in the coroutines design document to ensure that the distinction between potentiation suspension points (suspending function invocation) and the actual suspensions is more clear. See the last commit. I hope it helps.

2 Likes