Coroutine extra 'interceptContinuation' call


#1

Running the sample code below with commented / uncommented line 4 in function runSuspension (that calls suspendDummy) reveals strange behavior of ContinuationInterceptor. With mentioned line uncommented the Interceptor of original coroutine (coroutine [0] which is called in line 3 of main function) is first called on the coroutine start (which is expected). And than is called again inside function runSuspension before the second coroutine (coroutine [1] which is called in line 3 of runSuspension function). This second call seems does nothing essential and looks like a bug. (What is it trying to intercept there?). From the other hand if the mentioned line is commented Interceptor behaves as expected without extra interception call. The same expected behavior, even with uncommented call to suspendDummy is observed when the whole body of the function runSuspension is inlined into the place of the call in main function.
The prints with commented and uncommented call to suspendDummy are below.
I’d like to note that this behavior is essential for the library i develop since i use Interceptor’s interceptContinuation method to trigger a currently executing coroutine, and i fail to do that if it’s called in unexpected occasions.
Coroutine API Authors, please, correct that behavior if it’s faulty or document the cases when it can occur.

fun main(args: Array<String>) {
	println("Before Coroutine")
	runCoroutine(0) {
		println("[0] Before suspension")
		runSuspension(1)
		println("[0] After suspension")
	}
	println("After Coroutine")
}

fun runCoroutine(id:Int, code: suspend () -> Unit) = code.startCoroutine(Completion(id))

suspend fun runSuspension(id: Int) = suspendCoroutine<Unit> {
	println("[$id] Before Coroutine")
	  runCoroutine(id) {
		  suspendDummy() //   !!!  UN/COMMENT this line to switch behavior
		  println("[$id] Inside Coroutine")
		  it.resume(Unit)
	  }
  }

suspend fun suspendDummy() = 1

class Completion<T>(val id: Int): Continuation<T> {
	override val context = Interceptor(id)
	override fun resume(value: T) = println("[$id] coroutine Complete")
	override fun resumeWithException(exception: Throwable) = println("[$id] coroutine Complete with $exception")
}

class Interceptor(val id: Int): AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
	var again = false
	override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
		println("[$id] INTERCEPT  CONTINUATION ${if (again) "Again !!!" else ""}")
		val prev = again
		again = true
		return Interception("$id${if (prev) "+" else ""}", continuation)
	}
}

class Interception<T>(val id: String, val continuation: Continuation<T>): Continuation<T> {
	override val context = continuation.context
	override fun resume(value: T) {
		executor.execute {
			println("[$id] resume  interception")
			continuation.resume(value)
		}
	}
	override fun resumeWithException(exception: Throwable) {
		println("[$id] resume  interception with $exception")
		continuation.resumeWithException(exception)
	}
}

val executor = Executors.newCachedThreadPool().apply {
	execute {
		awaitTermination(1000, TimeUnit.MILLISECONDS)
		shutdown()
	}
}

Prints when suspendDummy() line is commented: That is the expected behavior.

Before Coroutine
[0] INTERCEPT CONTINUATION
After Coroutine
[0] resume interception
[0] Before suspension
[1] Before Coroutine
[1] INTERCEPT CONTINUATION
[1] resume interception
[1] Inside Coroutine
[0] resume interception
[1] coroutine Complete
[0] After suspension
[0] coroutine Complete

Prints when suspendDummy() line is uncommented: Note line 6 and 10. Those are different from the print above. It’s where that strange behavior reveals itself.

Before Coroutine
[0] INTERCEPT CONTINUATION
After Coroutine
[0] resume interception
[0] Before suspension
[0] INTERCEPT CONTINUATION ------- Again !!!
[1] Before Coroutine
[1] INTERCEPT CONTINUATION
[1] resume interception
[1] Inside Coroutine
[0+] resume interception ------- Note that call is from second instance of Interception class
[1] coroutine Complete
[0] After suspension
[0] coroutine Complete


#2

It should intercept the second time, because you are starting a new coroutine (each use of startCoroutine starts a new one). The fact that it does not intercept when you comment out suspendDummy() seems like a bug at a first glance. I’ll verify tomorrow and will file a bug if that is the case.

Note, that you can start coroutine that does not intercept initial continuation (until the first suspension point) using startCoroutineUninterceptedOrReturn intrinsics function. It is used by kotlinx.coroutines library to implement proper interception for some complex functions like select expression and to optionally implement C#-style behavior for async and other builders with CoroutineStart.UNDISPATCHED option.


#3

According to my observations in normal scenario interceptor’s interceptContinuation method is called by the Coroutine API (later just the API) before starting new coroutine to get an instance of wrapper Continuation which API stores and uses it each time when it needs to resume that coroutine giving it a chance to be intercepted. That is, the first resume is called when it begins executing the coroutine body up to the first suspension point, and next resumes are called each time the coroutine body returns from subsequent suspensions.
So if interceptor’s interceptContinuation method is aimed to get the wrapper Continuation of the coroutine that it is about to execute than why the API calls interceptContinuation for the coroutine that was already started? Why does the API need the second instance of wrapper Continuation for the coroutine that is running?
In the example above each coroutine has personal Interceptor as the context. When the main coroutine starts the API calls its main interceptor interceptContinuation which is ok. Later when it enters the suspendCoroutine with the second coroutne (see runSuspension function above) the API for unknown reason calls the main interceptor interceptContinuation again!!!. But why? And only after that the API calls the second coroutine’s interceptor interceptContinuation which is ok.
To my mind the second call to main interceptor interceptContinuation is excessive. It occurs not before the start of main coroutine but after it and right before the first suspension of that coroutine. And as it’s inferred from extended testing it’s not called before the following suspensions.


#4

Ok. I finally got to a computer to check that. There is a bug, but it is not what I’ve originally though it is:
https://youtrack.jetbrains.com/issue/KT-18486

In short, when you uncomment the invocation of suspendDummy() it tricks the compiler into generating non-efficient code for runSuspendsion. Semantically it is correct, though. interceptContinuation should be called, by design, for each instance of state machine, so if you have multiple state-machines (multiple complex function) running as a part of the same coroutine, the you’ll see interceptContinuation invoked multiple times (once of each state-machine instance). It is just in this case a function runSuspension that should have been treated by compiler as a simple tail-only suspending function, is mistakenly treated as a complex suspending function with a state machine.

UPDATE: Good news is that this bug is already fixed in the Kotlin master (it will be fixed in version 1.1.4)


#5

Kotlin coroutines are stackful. You can have a whole stack of suspending functions with their own state-machines that are running inside a single coroutine. Each instance of a state-machine (it is an object that implements continuation) gets passed to interceptContinuation once, so that suspending point in this state-machine can be properly intercepted.


#6

Ok. Thanks. Just for the sake of clarity. Does it mean in the fixed version the API will request interceptContinuation for each coroutine only once, right before its initial resumption?


#7

No. interceptContinuation is invoked once for each state-machine and there can be many state-machines per coroutine. Interceptors are designed to intercept coroutine resumptions and your actual interception logic should not be inside interceptContinuation function, but rather inside resume function of the resulting continuation.