[Coroutines] resumeWithException not working as expected with custom continuation

Hello all. I’ve done my best to discover what is wrong, but to no avail. This problem is based on some work I’d done earlier and am convinced worked as expected. However, I am not able to reproduce the exception condition reliably, having tried back as far as kotlinx 0.14.1.

The following code demonstrates what I believe is either the issue or a hole in my understanding.

package coroutines

import kotlinx.coroutines.experimental.CoroutineDispatcher
import kotlinx.coroutines.experimental.asCoroutineDispatcher  
import kotlinx.coroutines.experimental.async
import kotlinx.coroutines.experimental.runBlocking
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import kotlin.coroutines.experimental.AbstractCoroutineContextElement
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.ContinuationInterceptor
import kotlin.coroutines.experimental.CoroutineContext

val dispatchExecutor: java.util.concurrent.ExecutorService = Executors.newFixedThreadPool(2)

// the thread local I want to maintain
val threadInt = ThreadLocal<Int>()
val latch = CountDownLatch(1)
fun main(args: Array<String>) {

    val result = runBlocking {
        val dispatcher: CoroutineDispatcher = dispatchExecutor.asCoroutineDispatcher()
        val intContext = InterceptingContext(dispatcher, latch)

        async(intContext) {
            println("Before exception")
            throw Exception("Whoops")
            println("After exception")
            3 //arbitrary value to return from Deferred
        }.await()

    }
    latch.await(5, TimeUnit.SECONDS)
    println("Done $result")
}

    class InterceptingContext(private val delegateInterceptor: ContinuationInterceptor,
                      private val latch: CountDownLatch)
    : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {

    override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {

        val wrapped = WrappedContinuation(continuation,
                                      latch,
                                      { println("In pre-block") },
                                      { println("In post-block") })

        return delegateInterceptor.interceptContinuation(wrapped)
    }
}

class WrappedContinuation<in T>(
    private val continuation: Continuation<T>,
    private val latch: CountDownLatch,
    private val preblock: () -> Unit,
    private val postblock: () -> Unit)
: Continuation<T> {

    override val context: CoroutineContext
        get() = continuation.context

    override fun resume(value: T) {
        println("${Thread.currentThread().name} In resume")
        preblock()
        try {
            continuation.resume(value)
        } finally {
            postblock()
        }
    }

    override fun resumeWithException(exception: Throwable) {
        println("${Thread.currentThread().name} In resumeWithException")
        preblock()
        try {
            continuation.resumeWithException(exception)
        } finally {
            postblock()
            latch.countDown()
        }
    }

}

The point of the above code is to enable arbitrary execution of code before and after any resumption of a continuation. I’ve made two assumptions:

  1. My WrappedContinuation instance will intercept and delegate “resume(…)” (It does)
  2. if any exception occurs during execution of resume, the WrappedContinuation instance will intercept and delegate “resumeWIthException(…)” (it does not)

Am I missing something? I’ve assumed symmetry in this case, and my endeavour to debug through the kotlinx code has not left me any wiser, though I’ve seen plenty of invocations of resumeWithException as a result of the exception…just not on my wrapped continuation!

output:

pool-1-thread-1 @coroutine#2 In resume
In pre-block
Before exception
In post-block
Exception in thread "main" java.lang.Exception: Whoops
at coroutines.TestExecutor2Kt$main$result$1$1.doResume(TestExecutor2.kt:29)
at kotlin.coroutines.experimental.jvm.internal.CoroutineImpl.resume(CoroutineImpl.kt:54)
at coroutines.WrappedContinuation.resume(TestExecutor2.kt:68)
at kotlinx.coroutines.experimental.DispatchedContinuation$resume$1.run(CoroutineDispatcher.kt:152)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Any insight would be gratefully received.

Can you, please, explain what is not working? E.g. what results from running this code you’ve expected and what are the actual results you are getting?

I’m running this code and I’m not seeing any anomalies. The coroutine that you have in this code has a single continuation – from the start of the coroutine to its completion with the throw (it does not have suspensions so the initial continuation is all it has), so I’m seeing one interception occurring, printing:

In pre-block          // before continuation (starting coroutine)
Before exception  // execution code in the coroutine
In post-block        // after continuation (completing coroutine)

In my enthusiasm for using coroutines, I made some assumptions about how and when resumeWithException(…) would be called.

Assumption 1: If an exception is thrown in the coroutine after “onResume”, the same coroutine context would have resumeWithException(…) raised as an exception “callback” of sorts.

– the above appears to be wrong, given the test code already provided in this post. I think I also understand why my assumption was wrong.

Assumption 2: If coroutine B suspends on coroutine A, and coroutine A throws an Exception, then coroutine B would have “resumeWithException” raised.

so I tried a different test, based on the one above.

package coroutines

import kotlinx.coroutines.experimental.CoroutineDispatcher
import kotlinx.coroutines.experimental.Deferred  
import kotlinx.coroutines.experimental.asCoroutineDispatcher  
import kotlinx.coroutines.experimental.async
import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.experimental.runBlocking
import java.util.concurrent.Executors
import java.util.concurrent.RejectedExecutionException
import kotlin.coroutines.experimental.AbstractCoroutineContextElement
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.ContinuationInterceptor
import kotlin.coroutines.experimental.CoroutineContext

//just an coroutines.getExecutor with two threads and a blocking backing queue
val dispatchExecutor1: java.util.concurrent.ExecutorService = Executors.newFixedThreadPool(2)
val dispatchExecutor2: java.util.concurrent.ExecutorService = Executors.newFixedThreadPool(2)

// the thread local I want to maintain
fun main(args: Array<String>) {
    val dispatcher1: CoroutineDispatcher = dispatchExecutor1.asCoroutineDispatcher()
    val dispatcher2: CoroutineDispatcher = dispatchExecutor2.asCoroutineDispatcher()

    val deferred1: Deferred<Int> = dispatch(dispatcher1) {
        println("Before exception")
//        delay(5000)
        throw Exception("Whoops")
    }

    val deferred2: Deferred<Int> = dispatch(dispatcher2) {
        val result1 = deferred1.await()
        result1 + 1
    }

    runBlocking {
        val finalResult = deferred2.await()
        println("Done $finalResult")
    }

    dispatchExecutor1.shutdownNow()
    dispatchExecutor2.shutdownNow()
}

class InterceptingContext(private val delegateInterceptor: ContinuationInterceptor)
    : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {

    override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {

        val wrapped = WrappedContinuation(continuation,
                                      { println("In pre-block ${Thread.currentThread().name}") },
                                      { println("In post-block ${Thread.currentThread().name}") })

        return delegateInterceptor.interceptContinuation(wrapped)
    }
}

class WrappedContinuation<in T>(
        private val continuation: Continuation<T>,
        private val preblock: () -> Unit,
        private val postblock: () -> Unit)
    : Continuation<T> {

    override val context: CoroutineContext
        get() = continuation.context

    override fun resume(value: T) {
        println("${Thread.currentThread().name} In resume")
        preblock()
        try {
            continuation.resume(value)

        } finally {
            postblock()
        }
    }

    override fun resumeWithException(exception: Throwable) {
        println("${Thread.currentThread().name} In resumeWithException")
        preblock()
        try {
            continuation.resumeWithException(exception)
        } finally {
            postblock()
        }
    }

}

private fun <R> dispatch(dispatcher: CoroutineDispatcher,
                     block: suspend () -> R): Deferred<R> {

    return try {
        async(InterceptingContext(dispatcher)) {
            block()
        }

    } catch (ree: RejectedExecutionException) {
        ree.printStackTrace()
        throw ree
    }
}

Main difference is that there are now two coroutines instead of one, and the second waits on the first. And here’s where I am confused again. If I uncomment the “delay(5000)” line in coroutine 1, then I do, in fact get resumeWithException raised in coroutine 2.

pool-1-thread-1 @coroutine#1 In resume
pool-2-thread-1 @coroutine#2 In resume
In pre-block pool-2-thread-1 @coroutine#2
In pre-block pool-1-thread-1 @coroutine#1
Before exception
In post-block pool-2-thread-1 @coroutine#2
In post-block pool-1-thread-1 @coroutine#1
pool-1-thread-2 @coroutine#1 In resume
In pre-block pool-1-thread-2 @coroutine#1
In post-block pool-1-thread-2 @coroutine#1
pool-2-thread-2 @coroutine#2 In resumeWithException
In pre-block pool-2-thread-2 @coroutine#2
In post-block pool-2-thread-2 @coroutine#2
Exception in thread "main" java.lang.Exception: Whoops
at coroutines.TestExecutor2Kt$main$deferred1$1.doResume(TestExecutor2.kt:28)
at kotlin.coroutines.experimental.jvm.internal.CoroutineImpl.resume(CoroutineImpl.kt:54)
at coroutines.WrappedContinuation.resume(TestExecutor2.kt:71)
at kotlinx.coroutines.experimental.DispatchedContinuation$resume$1.run(CoroutineDispatcher.kt:152)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

But if I leave the delay commented out, then resumeWithException never gets called…the exception just percolates out of the main method. This implies to me some timing issue in getting the resumeWithException invocation, in that coroutine1 failed before coroutine2 was even waiting for it, and thus the exception was “lost”.

My biggest hope here is that there is clear best practice on capturing exceptions in pipelines of coroutines. The only solution I can see to get exception logging closer to the exception detection is to wrap the call to “block()” in a try catch, and log there.

Without delay, the exception is thrown by the deferred1 coroutine so fast, that the deferred2 coroutine does not have chance to suspend. It just continues execution with exception. However, with delay in the deferred1, the second coroutine suspends, and then resumes with exception.

OK, I understand now. Thank you for taking the time to answer.

@elizarov when can we expect coroutines to be finalized? I am really excited to play with them.

They will be finalized when we receive enough feedback to have a confidence in all the details of their design. Meanwhile, we’ve released them in Kotlin 1.1, so that you can start using them. See here for details Can "experimental" Kotlin coroutines be used in production? - Stack Overflow

A post was split to a new topic: Functions with generic return typ