Generic function to wrap callbacks with coroutines

Say I have a Callback interface as follows:

interface Callback<T> {
  fun onComplete(result: T)
  fun onException(e: Exception?)
}

My (Java) codebase is full of async methods (using ScheduledExecutors) with the following kind of signature:

fun doSomethingAsync(foo: Foo, bar: Bar, callback: Callback<Baz>) {...}

I know I can wrap it as a suspending function like this:

suspend fun doSomethingAsync(foo: Foo, bar: Bar) : Baz =
  suspendCoroutine { cont ->
    doSomethingAsync(foo, bar, object : Callback<Baz> {
      override fun onComplete(result: Baz) = cont.resume(result)
      override fun onException(e: Exception?) = cont.resumeWithException(e as Throwable)
    }
  }

This works, but as I have said there are many such methods, so I was wondering if there’s a technique or pattern that I can use to generally do this? One characteristic of such async functions is that the callback parameter is always the last argument of the function. However, what throws me off is that the arguments before the callback can be of any number.

Basically, I want to have a toSuspendFunction function where toSuspendFunction(::functionWithCallback) removes the Callback and make it a suspend function.

I’m guessing I have to use reflections, so any pointers to that (or any other solution) is appreciated!

3 Likes

You can so something like this

interface Callback<T> {
    fun onComplete(result: T)
    fun onException(e: Exception?)
}

suspend fun <T> awaitCallback(block: (Callback<T>) -> Unit) : T =
     suspendCancellableCoroutine { cont ->
        block(object : Callback<T> {
            override fun onComplete(result: T) = cont.resume(result)
            override fun onException(e: Exception?) {
                e?.let { cont.resumeWithException(it) }
            }
        })
    }


fun test1(a: Int, callback: Callback<String>) {
    Thread.sleep(1000)
    callback.onComplete(a.toString())
}

fun test2(a: String, callback: Callback<String>) {
    Thread.sleep(1000)
    callback.onComplete(a)
}

fun test3(a: Int, b:Int, callback: Callback<Int>) {
    Thread.sleep(1000)
    callback.onComplete(a + b)
}

fun <A, T> toSuspendFunction (fn: (A, Callback<T>) -> Unit): suspend (A)-> T = { a: A ->
    awaitCallback { fn(a, it) }
}

fun <A, B, T> toSuspendFunction (fn: (A, B, Callback<T>) -> Unit): suspend (A, B)-> T = { a: A, b: B ->
    awaitCallback { fn(a, b, it) }
}

fun main(args: Array<String>) = runBlocking {
    val testRes1: String = awaitCallback { test1(5, it) }
    val testRes2: String = awaitCallback { test2("test", it) }
    val testRes3: Int = awaitCallback { test3(5, 3, it) }
    val testRes4: String = toSuspendFunction(::test2)("test")
    val testRes5: Int = toSuspendFunction(::test3)(1, 2)
}

I personally use something like the awaitCallback<type> { block(it) } in my codebase because I think it looks a little cleaner than wrapping the functions like toSuspendFunction does. Otherwise if you want to use toSuspendFunction you just need to define one for each number of parameters your functions use. I gave you 1 and 2, the others should be easy to implement.

8 Likes

Oh wow, that is beautiful. I didn’t think of that! And yeah, the awaitCallback approach looks cleaner since we don’t have to define one for each argument number. Also +100 for not having to use reflection.

I really appreciate the answer, I’ll be using that :slight_smile:

Wow, this is really nice. In my codebase, it has the following method signature

fun doSomethingAsync(foo: Foo, onComplete: (T) -> Unit, onException: (Throwable?) -> Unit) {...}

now the toSusppendFunction signature turns to

fun <A, T> toSuspendFunction (fn: (A, onComplete: (T) -> Unit,  onException: (Throwable?) -> Unit) -> Unit): suspend (A)-> T = {...}

any idea how can I implement the awaitCallback function? Thanks in advance.