How to continue a suspend function in a dynamic proxy in the same coroutine?

I want to continue a suspend function in a dynamic proxy in the same coroutine.
Please have a look at the following code:

interface Adder {
    suspend fun add(a: Int, b: Int): Int
}

val IH = InvocationHandler { _, method, args ->
    val continuation = args.last() as Continuation<*>
    val realArgs = args.take(args.size - 1)
    println("${method.name}$realArgs")
    GlobalScope.launch {
        delay(5_000)
        @Suppress("UNCHECKED_CAST") (continuation as Continuation<Int>).resume(3)
    }
    COROUTINE_SUSPENDED
}

fun main() {
    val adder = Proxy.newProxyInstance(
        Adder::class.java.classLoader, arrayOf(Adder::class.java), IH
    ) as Adder
    runBlocking {
        println(adder.add(1, 2))
    }
}

It works fine. It runs the delay function in a new coroutine.
However, that’s not what I want.

I want to run the InvocationHandler in the same coroutine as the one that was started with runBlocking.
Something like:

val IH = InvocationHandler { _, _, _ ->
    delay(5_000)
    3
}

This obviously won’t compile because delay is a suspend function that must be run in a coroutine.
So the question is: How could I write the InvocationHandler for my intended behavior?
Any help would be very much appreciated.

I’d like to use this code in my RPC framework.
My real code would replace the delay call with non-blocking Ktor socket calls for serializing the data over the wire.
You can find the code example at: https://raw.githubusercontent.com/softappeal/yass/master/kotlin/yass/test/ch/softappeal/yass/remote/SuspendProxy.kt

I’ve found a solution for my problem:

package ch.softappeal.yass

import kotlinx.coroutines.*
import java.lang.reflect.*
import kotlin.coroutines.*
import kotlin.test.*

typealias SuspendInvoker = suspend (method: Method, arguments: List<Any?>) -> Any?

private interface SuspendFunction {
    suspend fun invoke(): Any?
}

private val SuspendRemover = SuspendFunction::class.java.methods[0]

@Suppress("UNCHECKED_CAST")
fun <C : Any> proxy(contract: Class<C>, invoker: SuspendInvoker): C =
    Proxy.newProxyInstance(contract.classLoader, arrayOf(contract)) { _, method, arguments ->
        val continuation = arguments.last() as Continuation<*>
        val argumentsWithoutContinuation = arguments.take(arguments.size - 1)
        SuspendRemover.invoke(object : SuspendFunction {
            override suspend fun invoke() = invoker(method, argumentsWithoutContinuation)
        }, continuation)
    } as C

interface Adder {
    suspend fun add(a: Int, b: Int): Int
}

class SuspendProxyTest {
    @Test
    fun test() {
        val adder = proxy(Adder::class.java) { method, arguments ->
            println("${method.name}$arguments")
            delay(100)
            3
        }
        runBlocking { assertEquals(3, adder.add(1, 2)) }
    }
}

Any comments?
Is this a good/problematic solution?
Could/should the “removing of suspend functionality” be added to the kotlin.coroutines library?

2 Likes

This was just what I needed, thanks!

It does seem like there should be a better way, though.

@nanodeath Thanks.

There is a more mature version at https://github.com/softappeal/yass2/blob/v0.4.0/src/jvmMain/kotlin/ch/softappeal/yass2/InterceptorReflection.kt

If you know of a better solution, please let me know.

1 Like

I also searched here and there to solve a similar problem, but I didn’t get a satisfactory answer. So, in the end, I found the answer myself.
This is my answer.

interface Adder {
    suspend fun add(a: Int, b: Int): Int
}

@Suppress("FunctionName", "NAME_SHADOWING", "UNCHECKED_CAST")
fun SuspendInvocationHandler(block: suspend (proxy: Any, method: Method, args: Array<*>?) -> Any?) =
    InvocationHandler { proxy, method, args ->
        val cont = args?.lastOrNull() as? Continuation<*>
        if (cont == null) {
            val args = args.orEmpty()

            runBlocking {
                block(proxy, method, args)
            }
        } else {
            val args = args.dropLast(1).toTypedArray()
            val suspendInvoker = block as (Any, Method, Array<*>?, Continuation<*>) -> Any?

            suspendInvoker(proxy, method, args, cont)
        }
    }



fun main() {
    val IH = SuspendInvocationHandler { proxy, method, args ->
        delay(5_000)
        3
    }
    val adder = Proxy.newProxyInstance(
        Adder::class.java.classLoader, arrayOf(Adder::class.java), IH
    ) as Adder

    runBlocking {
        println(adder.add(1, 2))
    }
}
1 Like

This URL seemed to be a broken.
I share the new URL.

Also, I personally felt it would be more type safe to write the following

fun <T> invokeSuspendFunction(
    continuation: Continuation<*>,
    block: suspend () -> T
): T = @Suppress("UNCHECKED_CAST") (block as (Continuation<*>) -> T)(continuation)

override fun invoke(proxy: Any, method: Method, args: Array<out Any>?): Any? {
    if (/* some condition */)  {
        val continuation = args!!.last() as Continuation<*>
        return invokeSuspendFunction(continuation) { /* some suspend call */ }
    }
    TODO("Not yet implemented")
}
1 Like