I am a relatively naive kotlin developer originating for java, currently kicking the tires a bit on coroutines to understand its capabilities. In short, it seems there are some situations in which the contexts do not combine successfully, though no error is thrown.
One point of immediate personal interest in coroutines is the maintenance of Thread Local contexts, where the worker thread may be pooled or highly dynamic, in the case of MDC context, for example. This code attempts to use a context interceptor to synchronise a trivial thread local when a coroutine “resumes”.
sample code (trivial).
Given:
// just an executor with two threads and a blocking backing queue
val dispatchExecutor: ExecutorService = ThreadPoolExecutor(
2, 2, 0L, TimeUnit.MILLISECONDS, ArrayBlockingQueue(20), Executors.defaultThreadFactory())
// the thread local I want to maintain
val threadInt = ThreadLocal<Int>()
// an interceptor
class InterceptingContext(val threadContextInt: Int)
: AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
val ctx = continuation.context
return ThreadContinuation(ctx.fold(continuation, { cont, element ->
if (element != this@InterceptingContext && element is ContinuationInterceptor)
element.interceptContinuation(cont) else cont
}))
}
private inner class ThreadContinuation<in T>(val continuation: Continuation<T>) : Continuation<T> by continuation {
override fun resume(value: T) {
threadInt.set(threadContextInt)
continuation.resume(value)
threadInt.remove()
}
override fun resumeWithException(exception: Throwable) {
threadInt.set(threadContextInt)
continuation.resumeWithException(exception)
threadInt.remove()
}
}
}
Attempt one using only dispatcher context:
fun main(args: Array<String>) {
runBlocking {
val dispatcher: CoroutineDispatcher = dispatchExecutor.asCoroutineDispatcher()
val jobs = List(3) { i ->
async(dispatcher) {
println("${Thread.currentThread().name} beforeSuspend")
delay(2000)
println("${Thread.currentThread().name} afterSuspend [local: $i, thread: ${threadInt.get()}]")
}
}
jobs.forEach {
it.join()
}
println("Done")
}
}
pool-1-thread-1 @coroutine#2 beforeSuspend
pool-1-thread-2 @coroutine#3 beforeSuspend
pool-1-thread-1 @coroutine#4 beforeSuspend
pool-1-thread-2 @coroutine#2 afterSuspend [local: 0, thread: null]
pool-1-thread-1 @coroutine#3 afterSuspend [local: 1, thread: null]
pool-1-thread-2 @coroutine#4 afterSuspend [local: 2, thread: null]
Done
Ok, now just with custom interceptor:
fun main(args: Array<String>) {
runBlocking {
val jobs = List(3) { i ->
val intContext: InterceptingContext = InterceptingContext(i)
async(intContext) {
println("${Thread.currentThread().name} beforeSuspend")
delay(2000)
println("${Thread.currentThread().name} afterSuspend [local: $i, thread: ${threadInt.get()}]")
}
}
jobs.forEach {
it.join()
}
println("Done")
}
}
main @coroutine#1 beforeSuspend
main @coroutine#1 beforeSuspend
main @coroutine#1 beforeSuspend
kotlinx.coroutines.ScheduledExecutor afterSuspend [local: 0, thread: 0]
kotlinx.coroutines.ScheduledExecutor afterSuspend [local: 1, thread: 1]
kotlinx.coroutines.ScheduledExecutor afterSuspend [local: 2, thread: 2]
Done
That seems fine. “main” thread manages the coroutine dispatchers, and the interceptor is working as expected. The ScheduledExecutor feels a bit out of left field, but I imagine that is configurable somewhere.
Now, combining contexts:
fun main(args: Array<String>) {
runBlocking {
val dispatcher: CoroutineDispatcher = dispatchExecutor.asCoroutineDispatcher()
val jobs = List(3) { i ->
val intContext: InterceptingContext = InterceptingContext(i)
async(dispatcher + intContext) {
// and the rest...
}
main @coroutine#1 beforeSuspend
main @coroutine#1 beforeSuspend
main @coroutine#1 beforeSuspend
kotlinx.coroutines.ScheduledExecutor afterSuspend [local: 0, thread: 0]
kotlinx.coroutines.ScheduledExecutor afterSuspend [local: 1, thread: 1]
kotlinx.coroutines.ScheduledExecutor afterSuspend [local: 2, thread: 2]
Done
Hmm, not what I expected. But it gets a bit worse…reversing the context in combination…
...
async(intContext + dispatcher) {
...
pool-1-thread-2 @coroutine#3 beforeSuspend
pool-1-thread-2 @coroutine#4 beforeSuspend
pool-1-thread-1 @coroutine#2 afterSuspend [local: 0, thread: null]
pool-1-thread-2 @coroutine#3 afterSuspend [local: 1, thread: null]
pool-1-thread-1 @coroutine#4 afterSuspend [local: 2, thread: null]
Done
I can only conclude that these two contexts somehow cancel each other, though I have been unable to track down the exact logic in the operator overload for “+” that is doing this. The behaviour I had been hoping for I can only achieve by nesting coroutines, like this:
...
async(dispatcher) {
println("${Thread.currentThread().name} beforeSuspend")
run(intContext) {
delay(2000)
println("${Thread.currentThread().name} afterSuspend [local: $i, thread: ${threadInt.get()}]")
}
}
...
I finally get something which demostrates multithreaded dispatch and context aware resumption:
pool-1-thread-1 @coroutine#2 beforeSuspend
pool-1-thread-2 @coroutine#3 beforeSuspend
pool-1-thread-1 @coroutine#4 beforeSuspend
kotlinx.coroutines.ScheduledExecutor afterSuspend [local: 0, thread: 0]
kotlinx.coroutines.ScheduledExecutor afterSuspend [local: 1, thread: 1]
kotlinx.coroutines.ScheduledExecutor afterSuspend [local: 2, thread: 2]
Done
Ideally I would resume on a thread from the pool, but this is closer to my target. Can someone illuminate the mechanics within context combination that prevents an ExecutorService from being combined with a custom intercepting context?
And… is there a more battle tested way of synchronising thread contexts across coroutine resumptions that I have missed? I would naturally avoid using thread locals whenever possible, but in my experience, that is a fool’s errand in many real world use cases, and at the very least, logging contexts appear to have embraced this whole-heartedly.