Combining "Incompatible" Contexts for Coroutines?


#1

I am a relatively naive kotlin developer originating for java, currently kicking the tires a bit on coroutines to understand its capabilities. In short, it seems there are some situations in which the contexts do not combine successfully, though no error is thrown.

One point of immediate personal interest in coroutines is the maintenance of Thread Local contexts, where the worker thread may be pooled or highly dynamic, in the case of MDC context, for example. This code attempts to use a context interceptor to synchronise a trivial thread local when a coroutine “resumes”.

sample code (trivial).

Given:

// just an executor with two threads and a blocking backing queue
val dispatchExecutor: ExecutorService = ThreadPoolExecutor(
    2, 2, 0L, TimeUnit.MILLISECONDS, ArrayBlockingQueue(20), Executors.defaultThreadFactory())

// the thread local I want to maintain
val threadInt = ThreadLocal<Int>()

// an interceptor 
class InterceptingContext(val threadContextInt: Int)
  : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {

override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
    val ctx = continuation.context
    return ThreadContinuation(ctx.fold(continuation, { cont, element ->
        if (element != this@InterceptingContext && element is ContinuationInterceptor)
            element.interceptContinuation(cont) else cont
    }))
}

private inner class ThreadContinuation<in T>(val continuation: Continuation<T>) : Continuation<T> by continuation {
    override fun resume(value: T) {
        threadInt.set(threadContextInt)
        continuation.resume(value)
        threadInt.remove()
    }

    override fun resumeWithException(exception: Throwable) {
        threadInt.set(threadContextInt)
        continuation.resumeWithException(exception)
        threadInt.remove()
    }
  }
}

Attempt one using only dispatcher context:

fun main(args: Array<String>) {

runBlocking {
    val dispatcher: CoroutineDispatcher = dispatchExecutor.asCoroutineDispatcher()
    val jobs = List(3) { i ->
        async(dispatcher) {
            println("${Thread.currentThread().name} beforeSuspend")
                delay(2000)
                println("${Thread.currentThread().name} afterSuspend [local: $i, thread: ${threadInt.get()}]")
        }
    }

    jobs.forEach {
        it.join()
    }

    println("Done")
  }
}

pool-1-thread-1 @coroutine#2 beforeSuspend
pool-1-thread-2 @coroutine#3 beforeSuspend
pool-1-thread-1 @coroutine#4 beforeSuspend
pool-1-thread-2 @coroutine#2 afterSuspend [local: 0, thread: null]
pool-1-thread-1 @coroutine#3 afterSuspend [local: 1, thread: null]
pool-1-thread-2 @coroutine#4 afterSuspend [local: 2, thread: null]
Done

Ok, now just with custom interceptor:

fun main(args: Array<String>) {

runBlocking {
    val jobs = List(3) { i ->
        val intContext: InterceptingContext = InterceptingContext(i)

        async(intContext) {
            println("${Thread.currentThread().name} beforeSuspend")
                delay(2000)
                println("${Thread.currentThread().name} afterSuspend [local: $i, thread: ${threadInt.get()}]")
        }
    }

    jobs.forEach {
        it.join()
    }

    println("Done")
  }
}

main @coroutine#1 beforeSuspend
main @coroutine#1 beforeSuspend
main @coroutine#1 beforeSuspend
kotlinx.coroutines.ScheduledExecutor afterSuspend [local: 0, thread: 0]
kotlinx.coroutines.ScheduledExecutor afterSuspend [local: 1, thread: 1]
kotlinx.coroutines.ScheduledExecutor afterSuspend [local: 2, thread: 2]
Done

That seems fine. “main” thread manages the coroutine dispatchers, and the interceptor is working as expected. The ScheduledExecutor feels a bit out of left field, but I imagine that is configurable somewhere.

Now, combining contexts:

fun main(args: Array<String>) {

runBlocking {
    val dispatcher: CoroutineDispatcher = dispatchExecutor.asCoroutineDispatcher()
    val jobs = List(3) { i ->
        val intContext: InterceptingContext = InterceptingContext(i)

        async(dispatcher + intContext) {
        // and the rest...
}

main @coroutine#1 beforeSuspend
main @coroutine#1 beforeSuspend
main @coroutine#1 beforeSuspend
kotlinx.coroutines.ScheduledExecutor afterSuspend [local: 0, thread: 0]
kotlinx.coroutines.ScheduledExecutor afterSuspend [local: 1, thread: 1]
kotlinx.coroutines.ScheduledExecutor afterSuspend [local: 2, thread: 2]
Done

Hmm, not what I expected. But it gets a bit worse…reversing the context in combination

...        
async(intContext + dispatcher) {
...

pool-1-thread-2 @coroutine#3 beforeSuspend
pool-1-thread-2 @coroutine#4 beforeSuspend
pool-1-thread-1 @coroutine#2 afterSuspend [local: 0, thread: null]
pool-1-thread-2 @coroutine#3 afterSuspend [local: 1, thread: null]
pool-1-thread-1 @coroutine#4 afterSuspend [local: 2, thread: null]
Done

I can only conclude that these two contexts somehow cancel each other, though I have been unable to track down the exact logic in the operator overload for “+” that is doing this. The behaviour I had been hoping for I can only achieve by nesting coroutines, like this:

...
       async(dispatcher) {
            println("${Thread.currentThread().name} beforeSuspend")
            run(intContext) {
                delay(2000)
                println("${Thread.currentThread().name} afterSuspend [local: $i, thread: ${threadInt.get()}]")
            }
        }
...

I finally get something which demostrates multithreaded dispatch and context aware resumption:

pool-1-thread-1 @coroutine#2 beforeSuspend
pool-1-thread-2 @coroutine#3 beforeSuspend
pool-1-thread-1 @coroutine#4 beforeSuspend
kotlinx.coroutines.ScheduledExecutor afterSuspend [local: 0, thread: 0]
kotlinx.coroutines.ScheduledExecutor afterSuspend [local: 1, thread: 1]
kotlinx.coroutines.ScheduledExecutor afterSuspend [local: 2, thread: 2]
Done

Ideally I would resume on a thread from the pool, but this is closer to my target. Can someone illuminate the mechanics within context combination that prevents an ExecutorService from being combined with a custom intercepting context?

And… is there a more battle tested way of synchronising thread contexts across coroutine resumptions that I have missed? I would naturally avoid using thread locals whenever possible, but in my experience, that is a fool’s errand in many real world use cases, and at the very least, logging contexts appear to have embraced this whole-heartedly.


#2

CoroutineContext is like a map from keys to values (not like a list!). Just like in a map, there can be at most one value for each key. ContinuationInterceptor companion object is a key (CoroutineContext.Key), thus you can have only one value for this key in the context. A context combination operation (+) is documented here: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines.experimental/-coroutine-context/plus.html

The rationale for this specification of + is that it works just like the corresponding stdlib operation on maps (see https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/plus.html) where the entries on the right-hand size of + are added to the map on the left-hand side of +, replacing all existing entries.

You can read more in design document on Kotlin coroutines: https://github.com/Kotlin/kotlin-coroutines/blob/master/kotlin-coroutines-informal.md#coroutine-context

So, if you want to combine two interceptors, then you have to make it explicitly yourself. You have to query existing interceptor in the context with context[ContinuationInterceptor] and write your InterceptingContext implementation in such a way that it delegates to the previously installed interceptor.


#3

Thank you for providing an excellent suggestion.

I had indeed grokked that the context mechanism was a map and inferred from the documentation that the process of combining contexts was a merge/replace left type operation. What I see now is that the CoroutineDispatcher derived from an executor is a ContextElement with key of ContinuationInterceptor, hence the collision.

However, I am still concerned about achieving what I set out to do, namely to populate the threadLocal(s) of the final dispatch thread that handles the resume. Conceptually, I grasp the idea of a chain of interceptors, potentially chained, but when one of those interceptors switches threads, and a second inteceptor must act on the new thread, then it is less clear to me how the coroutine library might support that. I feels like I would have to completely replace the ExecutorCoroutineDispatcher with my own implementation.

Is this right? Is there an easier way to add a post-resume hook to an existing context, like the coroutine dispatcher?


#4

When you write a code to wrap another ContinuationInterceptor you have quite a lot of freedom. So, you have to invoke interceptContinuation on the original interceptor, but you have a choice of what instance of continuation to pass into there. E.g., you can wrap the continuation instance yourself first, then pass the continuation that was already wrapped to the original interceptor. This way you have a chance to execute any logic you need on the new thread.


#5

That solves it. FOr those who are interested, this seems to be the solution:

import kotlinx.coroutines.experimental.CoroutineDispatcher
import kotlinx.coroutines.experimental.asCoroutineDispatcher
import kotlinx.coroutines.experimental.async
import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.experimental.runBlocking
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import kotlin.coroutines.experimental.AbstractCoroutineContextElement
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.ContinuationInterceptor  
import kotlin.coroutines.experimental.CoroutineContext

// just an coroutines.getExecutor with two threads and a blocking backing queue
val dispatchExecutor: java.util.concurrent.ExecutorService = java.util.concurrent.ThreadPoolExecutor(
    2, 2, 0L, TimeUnit.MILLISECONDS, ArrayBlockingQueue(20), Executors.defaultThreadFactory())

// the thread local I want to maintain
val threadInt = ThreadLocal<Int>()

fun main(args: Array<String>) {

  runBlocking {
    val dispatcher: CoroutineDispatcher = dispatchExecutor.asCoroutineDispatcher()
    val jobs = List(5) { i ->
        val intContext: InterceptingContext = InterceptingContext(dispatcher, i)

        async(intContext) {
            println("${Thread.currentThread().name} beforeSuspend")
            delay(2000)
            println("${Thread.currentThread().name} afterSuspend [local: $i, thread: ${threadInt.get()}]")
        }
    }

    jobs.forEach {
        it.join()
    }

    println("Done")
  }
}

class InterceptingContext(val delegateInterceptor: ContinuationInterceptor, val threadContextInt: Int)
: AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {

  override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
    val wrapped = WrappedContinuation(continuation,
            { threadInt.set(threadContextInt) },
            { threadInt.remove() })
    val delegatedContinuation = delegateInterceptor.interceptContinuation(wrapped)

    return delegatedContinuation
  }
}

class WrappedContinuation<in T>(
    val continuation: Continuation<T>,
    val preblock: () -> Unit,
    val postblock: () -> Unit)
: Continuation<T> {

override val context: CoroutineContext
    get() = continuation.context

override fun resume(value: T) {
    println("${Thread.currentThread().name} In resume")
    preblock()
    try {
        continuation.resume(value)
    } finally {
        postblock()
    }
}

override fun resumeWithException(exception: Throwable) {
    println("${Thread.currentThread().name} In resumeWithException")
    preblock()
    try {
        continuation.resumeWithException(exception)
    } finally {
        postblock()
    }
  }

}