Hi all.
I’ve been happily using coroutines for several months now, but have been unexpectedly been tripped up on what appears to be a change of behaviour with respect to RejectedExecutionException.
Consider the following code:
package coroutines
import kotlinx.coroutines.experimental.CoroutineDispatcher
import kotlinx.coroutines.experimental.Deferred
import kotlinx.coroutines.experimental.asCoroutineDispatcher
import kotlinx.coroutines.experimental.async
import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.experimental.runBlocking
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import kotlin.coroutines.experimental.AbstractCoroutineContextElement
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.ContinuationInterceptor
import kotlin.coroutines.experimental.CoroutineContext
//just an coroutines.getExecutor with two threads and a blocking backing queue
private val testExecutor: ExecutorService = ThreadPoolExecutor(
1, 1,
0L, TimeUnit.MILLISECONDS, ArrayBlockingQueue(1))
// the thread local I want to maintain
fun main(args: Array<String>) {
val dispatcher1: CoroutineDispatcher = testExecutor.asCoroutineDispatcher()
val deferred1: Deferred<Int> = dispatch(dispatcher1) {
println("Before delay")
delay(5000)
1
}
val deferred2: Deferred<Int> = dispatch(dispatcher1) {
println("Before delay2")
delay(5000)
1
}
val deferred3: Deferred<Int> = dispatch(dispatcher1) {
println("Before delay3")
delay(5000)
1
}
val deferred4: Deferred<Int> = dispatch(dispatcher1) {
println("Before delay4")
delay(5000)
1
}
runBlocking {
deferred1.await()
println("Done")
}
testExecutor.shutdownNow()
}
private fun <R> dispatch(dispatcher: CoroutineDispatcher,
block: suspend () -> R): Deferred<R> {
return try {
async(dispatcher) {
block()
}
} catch (ree: RejectedExecutionException) {
ree.printStackTrace()
throw ree
}
}
When using coroutines 0.16 and earlier, this code would reliably throw a rejected execution exception on the third dispatch (core pool size 1, with an ArrayBlockingQueue allowing 1 “pending” job). This was exactly the behaviour I wanted, as I’m using constrained blocking queues to throttle the number of “jobs” assigned to this node. Whether or not you agree with the architecture, the RejectedExecution is in fact expected in a world that has a constrained work queue.
When I updated to 0.17 or later, however, it seems that RejectedExecutionException is no longer being propagated! Instead, I see the following dispatcher show up in my logs:
Before delay pool-1-thread-1 @coroutine#1
Before delay3 kotlinx.coroutines.DefaultExecutor @coroutine#3
Before delay2 pool-1-thread-1 @coroutine#2
Before delay4 kotlinx.coroutines.DefaultExecutor @coroutine#4
Who is “kotlinx.coroutines.DefaultExecutor”, and why is is picking up jobs that I am expecting to be rejected? Then I find this code in ExecutorCoroutineDispatcherBase:
override fun dispatch(context: CoroutineContext, block: Runnable) =
try { executor.execute(timeSource.trackTask(block)) }
catch (e: RejectedExecutionException) {
timeSource.unTrackTask()
DefaultExecutor.execute(block)
}
why is this exception being ignored and the block silently being handed off elsewhere?