RejectedExecutionException no longer propagated?


#1

Hi all.
I’ve been happily using coroutines for several months now, but have been unexpectedly been tripped up on what appears to be a change of behaviour with respect to RejectedExecutionException.

Consider the following code:
package coroutines

import kotlinx.coroutines.experimental.CoroutineDispatcher
import kotlinx.coroutines.experimental.Deferred
import kotlinx.coroutines.experimental.asCoroutineDispatcher
import kotlinx.coroutines.experimental.async
import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.experimental.runBlocking
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.RejectedExecutionException 
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import kotlin.coroutines.experimental.AbstractCoroutineContextElement
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.ContinuationInterceptor
import kotlin.coroutines.experimental.CoroutineContext

//just an coroutines.getExecutor with two threads and a blocking backing queue
private val testExecutor: ExecutorService = ThreadPoolExecutor(
    1, 1,
    0L, TimeUnit.MILLISECONDS, ArrayBlockingQueue(1))

// the thread local I want to maintain
fun main(args: Array<String>) {
    val dispatcher1: CoroutineDispatcher = testExecutor.asCoroutineDispatcher()

    val deferred1: Deferred<Int> = dispatch(dispatcher1) {
        println("Before delay")
        delay(5000)
        1
    }

    val deferred2: Deferred<Int> = dispatch(dispatcher1) {
        println("Before delay2")
        delay(5000)
        1
    }

    val deferred3: Deferred<Int> = dispatch(dispatcher1) {
        println("Before delay3")
        delay(5000)
        1
    }

    val deferred4: Deferred<Int> = dispatch(dispatcher1) {
        println("Before delay4")
        delay(5000)
        1
    }

    runBlocking {
        deferred1.await()
        println("Done")
    }

    testExecutor.shutdownNow()
}


private fun <R> dispatch(dispatcher: CoroutineDispatcher,
                     block: suspend () -> R): Deferred<R> {

    return try {
        async(dispatcher) {
            block()
        }

    } catch (ree: RejectedExecutionException) {
       ree.printStackTrace()
        throw ree
    }
}

When using coroutines 0.16 and earlier, this code would reliably throw a rejected execution exception on the third dispatch (core pool size 1, with an ArrayBlockingQueue allowing 1 “pending” job). This was exactly the behaviour I wanted, as I’m using constrained blocking queues to throttle the number of “jobs” assigned to this node. Whether or not you agree with the architecture, the RejectedExecution is in fact expected in a world that has a constrained work queue.

When I updated to 0.17 or later, however, it seems that RejectedExecutionException is no longer being propagated! Instead, I see the following dispatcher show up in my logs:

Before delay pool-1-thread-1 @coroutine#1
Before delay3 kotlinx.coroutines.DefaultExecutor @coroutine#3
Before delay2 pool-1-thread-1 @coroutine#2
Before delay4 kotlinx.coroutines.DefaultExecutor @coroutine#4

Who is “kotlinx.coroutines.DefaultExecutor”, and why is is picking up jobs that I am expecting to be rejected? Then I find this code in ExecutorCoroutineDispatcherBase:

override fun dispatch(context: CoroutineContext, block: Runnable) =
    try { executor.execute(timeSource.trackTask(block)) }
    catch (e: RejectedExecutionException) {
        timeSource.unTrackTask()
        DefaultExecutor.execute(block)
    }

why is this exception being ignored and the block silently being handed off elsewhere?