What I want is to ensure that coroutine continuation is executed in a particular thread (similar is done in C# by capturing current SynchronizationContext
and ensuring Await()
returns in the captured context). Here is the code which discovers the problem:
suspend fun SomeAwait()
{
suspendCoroutine {
cont: Continuation<Unit> ->
thread {
cont.resume(Unit)
}
}
}
fun F()
{
val lock = ReentrantLock()
val cond = lock.newCondition()
var done = false
suspend {
val t1 = Thread.currentThread()
SomeAwait()
val t2 = Thread.currentThread()
if (t1 == t2) {
println("t1 $t1")
println("t2 $t2")
throw Error("Threads are same")
}
}.createCoroutine(object: Continuation<Unit> {
override val context: CoroutineContext = EmptyCoroutineContext
override fun resume(value: Unit)
{
lock.lock()
done = true
cond.signal()
lock.unlock()
}
override fun resumeWithException(exception: Throwable)
{
lock.lock()
done = true
cond.signal()
lock.unlock()
throw exception
}
}).resume(Unit)
lock.lock()
while (!done) {
cond.await()
}
lock.unlock()
}
fun Run()
{
for (i in 1..1000) {
F()
}
}
In this test I compare current thread before SomeAwait()
call and after. I expect they differ because SomeAwait()
calls continuation resume()
method in a newly spawned thread. However, occasionally, the threads are the same which means that suspendCoroutine()
continued coroutine instantly in the main thread instead of suspending.
Probably, I understand why it happens - newly spawned thread calls resume()
before lambda function passed to suspendCoroutine()
actually returns. It then probably checks that the continuation is ready, and continues instantly. BTW, is this behaviour expected or may be it is a bug? I did not found detailed documentation for this case.
So the question is - what is the proper pattern for ensuring the coroutine will resume in a particular thread (which is essential for asynchronous frameworks development)?