Ensure continuation thread for coroutine


#1

What I want is to ensure that coroutine continuation is executed in a particular thread (similar is done in C# by capturing current SynchronizationContext and ensuring Await() returns in the captured context). Here is the code which discovers the problem:

    suspend fun SomeAwait()
    {
        suspendCoroutine {
            cont: Continuation<Unit> ->

            thread {
                cont.resume(Unit)
            }
        }
    }

    fun F()
    {
        val lock = ReentrantLock()
        val cond = lock.newCondition()
        var done = false

        suspend {
            val t1 = Thread.currentThread()
            SomeAwait()
            val t2 = Thread.currentThread()
            if (t1 == t2) {
                println("t1 $t1")
                println("t2 $t2")
                throw Error("Threads are same")
            }
        }.createCoroutine(object: Continuation<Unit> {
            override val context: CoroutineContext = EmptyCoroutineContext

            override fun resume(value: Unit)
            {
                lock.lock()
                done = true
                cond.signal()
                lock.unlock()
            }

            override fun resumeWithException(exception: Throwable)
            {
                lock.lock()
                done = true
                cond.signal()
                lock.unlock()
                throw exception
            }
        }).resume(Unit)

        lock.lock()
        while (!done) {
            cond.await()
        }
        lock.unlock()
    }

    fun Run()
    {
        for (i in 1..1000) {
            F()
        }
    }

In this test I compare current thread before SomeAwait() call and after. I expect they differ because SomeAwait() calls continuation resume() method in a newly spawned thread. However, occasionally, the threads are the same which means that suspendCoroutine() continued coroutine instantly in the main thread instead of suspending.

Probably, I understand why it happens - newly spawned thread calls resume() before lambda function passed to suspendCoroutine() actually returns. It then probably checks that the continuation is ready, and continues instantly. BTW, is this behaviour expected or may be it is a bug? I did not found detailed documentation for this case.

So the question is - what is the proper pattern for ensuring the coroutine will resume in a particular thread (which is essential for asynchronous frameworks development)?


#2

Found information about ContinuationInterceptor but seems that it is used to intercept all continuations in a coroutine. What if I want to individually control each particular continuation?


#3

Are you seen https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md#dispatchers-and-threads ?


#4

Yes, this is good point. There is also this example. I checked the implementation and found a simpler solution for this particular case:

    suspend fun SomeAwait()
    {
        suspendCoroutineUninterceptedOrReturn {
            cont: Continuation<Unit> ->

            thread {
                cont.resume(Unit)
            }

            COROUTINE_SUSPENDED
        }
    }

This lower-level API ensures that calling thread is always suspended.