Best practice for coroutine that reports progress

Some tasks take a long time to complete. In a graphical UI, it’s helpful to display a progress indicator (progress bar) so that the user knows how much longer processing will take.

In .NET, there is the interface IProgress<T> for this purpose. The idea is that the caller passes an instance of IProgress to the coroutine, which then periodically reports its progress back.

Here’s how this concept might be applied to Kotlin:

import kotlinx.coroutines.*

interface Progress {
    // Reports a progress between 0 and 1
    fun report(value: Double);
}

// Upper-cases the given string, reporting progress.
// This is a stand-in for a more time-consuming task.
suspend fun toUpper(input: String, progress: Progress): String {
    var result = ""
    input.forEachIndexed { i, c ->
    	delay(100) // Simulate a more complex calculation
        result += c.toUpperCase()
        progress.report((i + 1).toDouble() / input.length)
    }
    return result
}

fun main() = runBlocking {
    val progress = object : Progress {
        override fun report(value: Double) {
        	println("Progress: ${(value * 100).toInt()}%")
        }
    }
    val result = toUpper("Hello world!", progress)
    println("Result: $result")
}

This works as expected, but I’m not sure whether this is the best approach in Kotlin. In toUpper(), progress is an extra parameter – that is, an input to the function – even though it really acts as an output.

In fact, toUpper has two output values: a stream of numbers that flows throughout its execution, and a string that is returned on completion.

Is there a more idiomatic way to express this idea in Kotlin?

2 Likes

I think it would look nicer to not have an interface but to pass a lambda, progress: (Double) → Unit

val result = toUpper("Hello world!") { progress ->
    println("Progress: $progress")
}

In which case you don’t need to define an interface or an anonymous object so it looks a bit cleaner.

Thanks for the suggestion! A lambda would certainly make the usage more elegant.

However, the real point of my question was whether I can avoid passing the progress object as an argument.

My initial thought would be to create your own continuation (or maybe a ContinuationIntercepter?) that takes a Progress (or lambda) that would be updated when resume and resumeWithException is called. I don’t have much experience with custom coroutines so it may not be that simple :man_shrugging:

Create a channel or a flow and let one coroutine be the producer of some, say, percentage, while a second coroutine (maybe the one that handles UI) can listen to the channel/flow and do something with it.

Have a look here and here.

2 Likes

@arocnies That’s just my problem: It feels like all the ingredients should be already there, but I can’t figure out just how to combine them.

@lamba92 A channel or flow would certainly be a good representation for the progress values. However, this solves only half the problem. The other half is that once progress is complete, the function still has to return its actual result.

Somehow, the function should synchronously return a Flow<double>, then asynchronously return the actual result.

1 Like

Could it studiously synchronously return two Flows, the first of double and the second of the result type. The first would give you multiple values, whereas the second would give you only one. (Or maybe I’m confusing how flows work with my passing understanding of ReactiveX-style programming?)

Here’s a generic approach that bundles progress and result together into a Flow:

sealed class ResultOrProgress<R, P> {
    data class Result<R, P>(val result: R): ResultOrProgress<R, P>()
    data class Progress<R, P>(val progess: P): ResultOrProgress<R, P>()
}

typealias FlowForResult<R, P> = Flow<ResultOrProgress<R, P>>

suspend fun <R, P> FlowCollector<ResultOrProgress<R, P>>.emitProgress(progress: P) = emit(ResultOrProgress.Progress(progress))
suspend fun <R, P> FlowCollector<ResultOrProgress<R, P>>.emitResult(result: R) = emit(ResultOrProgress.Result(result))

suspend fun <R, P> SendChannel<ResultOrProgress<R, P>>.sendProgress(progress: P) = send(ResultOrProgress.Progress(progress))
suspend fun <R, P> SendChannel<ResultOrProgress<R, P>>.sendResult(result: R) = send(ResultOrProgress.Result(result))

fun <R, P> SendChannel<ResultOrProgress<R, P>>.offerProgress(progress: P) = offer(ResultOrProgress.Progress(progress))
fun <R, P> SendChannel<ResultOrProgress<R, P>>.offerResult(result: R) = offer(ResultOrProgress.Result(result))

private suspend inline fun <R, P> FlowForResult<R, P>.collectForResult(crossinline action: suspend (value: P) -> Unit): R {
    return onEach {
        if (it is ResultOrProgress.Progress<R, P>) action(it.progess)
    }.filterIsInstance<ResultOrProgress.Result<R, P>>().single().result
}

fun toUpper(input: String): FlowForResult<String, Double> = flow {
    var result = ""
    input.forEachIndexed { i, c ->
        //delay(100) // Simulate a more complex calculation
        result += c.toUpperCase()
        emitProgress((i + 1).toDouble() / input.length)
    }
    emitResult(result)
}

fun main() = runBlocking {
    val result = toUpper("Hello world!").collectForResult {
        println("Progress: ${(it * 100).toInt()}%")
    }
    println("Result: $result")
}
4 Likes

That looks nice! The downside to this approach, as I see it, is that it reduces compile-time safety. For a regular (synchronous or asynchronous) function, the compiler can guarantee that if the function returns normally, it will return exactly one result. With this approach, emitResult is just another function. It’s possible to forget calling it, to forget calling it under certain conditions, or to call it repeatedly.

I believe that the only safe implementation would be to use a regular return for the final result. That would require a custom higher-level function similar to flow that expects a lambda block returning the final result. To allow progress indication, the scope of this block would be an object with a reportProgress method.

This still leaves open the question of what exactly this higher-order function would produce. One solution might be to return an object that implements both Deferred<TResult> and Progress. However, if my understanding is correct, such a signature would not allow for structured concurrency.

At this point, I wonder if I shouldn’t just go with my original approach, maybe using a function type instead of the interface. Yes, I’d have to pass the Progress object into the function, which is counter-intuitive, but on the other hand, the function signatures would be simple and easily understood.

If requiring a return in toUpper() is what you want, you could have the lambda return a result and emit that the return value from the higher-order function.

You could require the lambda return a result that gets emitted only once. You could further restrict emitting results only to the higher-order function, flowForRequiredResult .

I imagine the usage would be something like this:

fun <R, P> flowForRequiredResult(block: suspend FlowForResult<R, P>.() -> R) = progressFlow {
    val result = block() // Progress is emitted within the block
    emitResult(result) // Returned value is emitted as result
}

fun toUpper(input: String): FlowForResult<String, Double> = flowForRequiredResult {
    var result = ""
    input.forEachIndexed {
        emitProgress(...)
    }
    return@flowForRequiredResult result // Required to return a result here
}

fun main() = runBlocking {
    val result = toUpper("Hello world!").collectForResult {
        println("Progress: ${(it * 100).toInt()}%")
    }
    println("Result: $result")
}
1 Like

@arocnies, flowForRequiredResult still has the issue of being able to return early with a call to emitResult.

@Daniel_Wolf, if being able to call emitResult incorrectly or forgetting is the concern, then the emitProgress and emitResult extension methods could be removed and replaced with this:


interface FlowProgressCollector<P> {
    suspend fun emitProgress(value: P)
}

fun <R, P> flowForResult(block: suspend FlowProgressCollector<P>.() -> R): FlowForResult<R, P> {
    return flow<ResultOrProgress<R, P>> {
        val collector = object : FlowProgressCollector<P> {
            override suspend fun emitProgress(progress: P) {
                emit(ResultOrProgress.Progress(progress))
            }
        }
        val result = collector.block()
        emit(ResultOrProgress.Result(result))
    }
}

A similar interface and method could be created on top of channelFlow or other Flow factory methods.

PS: Looking back, defining an offerResult method was pretty silly.

1 Like

For similar task, I used Channel:

            val progressChannel = Channel<Int>(Channel.CONFLATED)
            launch(Dispatchers.Main){
                for(pos in progressChannel){
                    delay(200)
                    progressBar.progressPos = pos
                }
            }

Conflated channel assures that there is always single last actual value, and delay in UI loop assures that UI is updated in reasonable frequency (max 5Hz in this example).
The computing function posts to the channel in its own reasonable steps and doesn’t care about UI.

It’s questionable if progress channel is considered as input or output. I send it to computing function as parameter.

Thank you all for your great input! I’ve decided that for the time being, I’ll keep it simple and explicitly pass the progress object into the function. I’m only just learning Kotlin coroutines, and they are sufficiently different from async functions in C# and JavaScript that I feel I was trying to run before I could walk.

Once I’ve got a better practical understanding, I’ll revesit @nickallendev’s last proposal and see if refactoring my code makes it better.