Hello all. I’ve done my best to discover what is wrong, but to no avail. This problem is based on some work I’d done earlier and am convinced worked as expected. However, I am not able to reproduce the exception condition reliably, having tried back as far as kotlinx 0.14.1.
The following code demonstrates what I believe is either the issue or a hole in my understanding.
package coroutines
import kotlinx.coroutines.experimental.CoroutineDispatcher
import kotlinx.coroutines.experimental.asCoroutineDispatcher
import kotlinx.coroutines.experimental.async
import kotlinx.coroutines.experimental.runBlocking
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import kotlin.coroutines.experimental.AbstractCoroutineContextElement
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.ContinuationInterceptor
import kotlin.coroutines.experimental.CoroutineContext
val dispatchExecutor: java.util.concurrent.ExecutorService = Executors.newFixedThreadPool(2)
// the thread local I want to maintain
val threadInt = ThreadLocal<Int>()
val latch = CountDownLatch(1)
fun main(args: Array<String>) {
val result = runBlocking {
val dispatcher: CoroutineDispatcher = dispatchExecutor.asCoroutineDispatcher()
val intContext = InterceptingContext(dispatcher, latch)
async(intContext) {
println("Before exception")
throw Exception("Whoops")
println("After exception")
3 //arbitrary value to return from Deferred
}.await()
}
latch.await(5, TimeUnit.SECONDS)
println("Done $result")
}
class InterceptingContext(private val delegateInterceptor: ContinuationInterceptor,
private val latch: CountDownLatch)
: AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
val wrapped = WrappedContinuation(continuation,
latch,
{ println("In pre-block") },
{ println("In post-block") })
return delegateInterceptor.interceptContinuation(wrapped)
}
}
class WrappedContinuation<in T>(
private val continuation: Continuation<T>,
private val latch: CountDownLatch,
private val preblock: () -> Unit,
private val postblock: () -> Unit)
: Continuation<T> {
override val context: CoroutineContext
get() = continuation.context
override fun resume(value: T) {
println("${Thread.currentThread().name} In resume")
preblock()
try {
continuation.resume(value)
} finally {
postblock()
}
}
override fun resumeWithException(exception: Throwable) {
println("${Thread.currentThread().name} In resumeWithException")
preblock()
try {
continuation.resumeWithException(exception)
} finally {
postblock()
latch.countDown()
}
}
}
The point of the above code is to enable arbitrary execution of code before and after any resumption of a continuation. I’ve made two assumptions:
- My WrappedContinuation instance will intercept and delegate “resume(…)” (It does)
- if any exception occurs during execution of resume, the WrappedContinuation instance will intercept and delegate “resumeWIthException(…)” (it does not)
Am I missing something? I’ve assumed symmetry in this case, and my endeavour to debug through the kotlinx code has not left me any wiser, though I’ve seen plenty of invocations of resumeWithException as a result of the exception…just not on my wrapped continuation!
output:
pool-1-thread-1 @coroutine#2 In resume
In pre-block
Before exception
In post-block
Exception in thread "main" java.lang.Exception: Whoops
at coroutines.TestExecutor2Kt$main$result$1$1.doResume(TestExecutor2.kt:29)
at kotlin.coroutines.experimental.jvm.internal.CoroutineImpl.resume(CoroutineImpl.kt:54)
at coroutines.WrappedContinuation.resume(TestExecutor2.kt:68)
at kotlinx.coroutines.experimental.DispatchedContinuation$resume$1.run(CoroutineDispatcher.kt:152)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Any insight would be gratefully received.