For those who do not wish to spend time reading all thread here is a small recap:
First point:
Coroutine API
suspendCoroutine
function can’t be resumed immediately after calling itscontinuation.resume
method up until it finishes execution of its body.
Second point:
ContinuationInterceptor
doesn’t intercept a continuation of a suspension that has been resumed in the thread it had been started.
The original question was the following:
In the following sample suspendCoroutine
function (Suspension) in exec2
sequentially executes its body code within some Coroutine exec1
. But at some moment before Suspension execution is finished other code from another thread calls this Suspension continuation.resume. What’s expected is the Suspension to be resumed as soon as its continuation.resume is called. Thereby resuming the Coroutine at the same moment. Although what’s observed is the Suspension continues execution of its body and resumes Coroutine only after it’s done.
fun main(args: Array<String>) {
exec1 {
exec2()
}
}
fun exec1(code: suspend () -> Unit) {
code.startCoroutine(object : Continuation<Unit>{
override val context=EmptyCoroutineContext
override fun resume(value: Unit) = println("exec1 resume")
override fun resumeWithException(exception: Throwable) = println("exec1 resumeWithException $exception")
})
}
suspend fun exec2(): Unit = suspendCoroutine { cont->
println("exec2 started")
thread {
println("exec2 canceled")
cont.resumeWithException(Exception("oops"))
}
Thread.sleep(1000)
println("exec2 finished")
// cont.resume(Unit)
}
This prints:
exec2 started
exec2 canceled
exec2 finished
exec1 resumeWithException java.lang.Exception: oops
But expected is:
exec2 started
exec2 canceled
exec1 resumeWithException java.lang.Exception: oops
exec2 finished
Why? Because a code within a suspendCoroutine
function can be another long-running Coroutine which can decide by itself whether to run its code in current or parallel thread. And as long as it’s the long-running code it may be liable to cancellation at some point before it’s finished. Although with such Suspension behavior it’s impossible to actually cancel its body execution when it’s running in the thread it had been started in.
May be authors have a strong reason for that, which i’d like to know. Otherwise why not make a Suspension resumable at the moment of resumption not the moment it’s done executing its body.
The second point:
In the following sample i’m trying to track current Coroutine
by intercepting it. Interceptor intercepts resumption after suspension only when Interceptor jumps to another thread (runner
is not null). Otherwise (runner
is null) if coroutine is going to be resumed after suspension in the same thread, Interceptor does not intercept resumption. Which make Interceptor useful only for jumping to threads but useless as the the general tool for actual interception of resumptions.
fun main(args: Array<String>) {
coroutine {
suspend { 1 }
}
runner?.awaitTermination(1, TimeUnit.SECONDS)
runner?.shutdown()
}
val runner: ExecutorService? = /*null*/Executors.newFixedThreadPool(2)
var IDS = 0
val tracker = Tracker()
fun coroutine(code: suspend () -> Unit) {
code.startCoroutine(Coroutine(IDS++, null))
}
suspend fun <T> suspend(code: suspend () -> T): T = suspendCoroutine { cont ->
code.startCoroutine(Coroutine(IDS++, cont))
}
class Coroutine<T>(val id: Int, val cont: Continuation<T>?): Continuation<T> {
override val context = tracker + Interceptor()
override fun resume(value: T) {
println("resume $this: current: ${tracker.current}")
cont?.resume(value)
}
override fun resumeWithException(exception: Throwable) {
println("resumeWE $this: current: ${tracker.current}")
cont?.resumeWithException(exception)
}
override fun toString() = "[Coroutine $id]"
inner class Interceptor: AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = object: Continuation<T> {
override val context = continuation.context
override fun resume(value: T) {
tracker.intercept(this@Coroutine)
runner?.execute { continuation.resume(value) } ?: continuation.resume(value)
}
override fun resumeWithException(exception: Throwable) {
tracker.intercept(this@Coroutine)
runner?.execute { continuation.resumeWithException(exception) } ?: continuation.resumeWithException(exception)
}
}
}
}
class Tracker: AbstractCoroutineContextElement(Tracker) {
companion object Key: CoroutineContext.Key<Tracker>
var current: Coroutine<*>? = null
fun intercept(c: Coroutine<*>) {
current = c
println("intercepted current: $current")
}
}
Prints if runner is not null (each coroutine executes in its own thread):
intercepted current: [Coroutine 0]
intercepted current: [Coroutine 1]
resume [Coroutine 1]: current: [Coroutine 1]
intercepted current: [Coroutine 0]
resume [Coroutine 0]: current: [Coroutine 0]
State is consistent since current
Coroutine corresponds to resumend one. See the last line.
Prints if runner is null (execution is in the same thread):
intercepted current: [Coroutine 0]
intercepted current: [Coroutine 1]
resume [Coroutine 1]: current: [Coroutine 1]
resume [Coroutine 0]: current: [Coroutine 1]
State is NOT consistent since current
Coroutine does not correspond to resumend one. See the last line. And note the absence of line 4 from previous print.
Why not make Interceptor intercept every resumption, not only those which occurred in another thread?