Approach to create a wrapper with suspend function over Java Thread based API

I am trying to create a Kotlin wrapper on Java Thread based API and I can come up with two approaches, so needs some suggestions in terms of choosing one.

Let’s say I have a Java API which performs an async operation in a thread and returns result via callback as below. For simplicity and example perspective, it is written in Kotlin right now but assume that it is in Java.

// Assume that this class is thread safe.
class Operation {
    fun cancel() {
        // Cancel the long running operation
    }

    fun execute(onSuccess: (Int) -> Unit, onFailure: (Exception) -> Unit) {
        thread(name = "suspend") {
            sleep(1000)
            onSuccess(50)
        }
    }
}

Now to create a wrapper over it which can be suspended, I can think of below two approaches:

1. With Completable Deferred

suspend fun awaitResult(): Int = coroutineScope {
    val deferred = CompletableDeferred<Int>(this.coroutineContext[Job])
    val operation = Operation()
    operation.execute(
        onSuccess = {
            deferred.complete(it)
        },
        onFailure = {
            deferred.completeExceptionally(it)
        }
    )
    try {
        deferred.await()
    } catch (e: CancellationException) {
        operation.cancel()
        throw e
    }
}

2. With Suspend Coroutine

suspend fun awaitResult(): Int = suspendCancellableCoroutine { continuation ->
    val operation = Operation()
    continuation.invokeOnCancellation {
        operation.cancel()
    }
    operation.execute(
        onSuccess = {
            continuation.resume(it)
        },
        onFailure = {
            continuation.resumeWithException(it)
        }
    )
}

It seems both are working as expected. Though internally for some of the suspend functions, kotlin itself is using suspendCancellableCoroutine and Retrofit is also using it to give support for suspend functions for services. So I think I am missing something here and would love to hear the difference and suggestion.

Also in one of the thread I have read that Continuation is an internal implementation detail and should not be used outside, does that still hold true?

It’s not an implementation detail. It’s part of the public API. suspendCancellableCoroutine is specifically designed for your use case and it is appropriate to use.

Note that your two implementations are not really equivalent in error cases.

For example, consider:

val myJob = GlobalScope.launch {
    try {
        println(awaitResult())
    } catch (e: SomeException) {
        println(awaitResult())
    }
}

The suspendCancellableCoroutine implementation will not fail myJob if there’s an error. The exception is simply caught.

However, because CompletableDeferred is created as a regular (no supervisor jobs) descendant of myJob, the call to deferred.completeExceptionally(it) will immediately fail myJob.

I recommend sticking with suspendCancellableCoroutine.

1 Like

Thanks @nickallendev for the response.

I tried the example you suggested. In both the cases, the exception is being caught in the catch clause and myJob is not failing.

This is something I tried:

class Operation {
    ...

    fun execute(onSuccess: (Int) -> Unit, onFailure: (Exception) -> Unit) {
        thread(name = "suspend") {
            sleep(1000)
            onFailure(IllegalStateException()) // Failing the operation by throwing an Exception
        }
    }
}


fun main() = runBlocking {
    val myJob = GlobalScope.launch {
        try {
            println(awaitResult())
        } catch (e: Exception) {
            println("Caught exception: $e")
            println(awaitResult())
        }
    }
    myJob.join()
    println("Done")
}

I think myJob is not failing because it is a part of GlobalScope and hence there is no structured concurrency. If I just use launch{} then myJob is failing in both the cases.

Am I missing something?

No, it was my mistake. I missed that coroutineScope creates a Job that is not related to myJob, so the CompletableDeferred is not related either.