Difficulty seeing speedup in using coroutines

I’m experimenting with using coroutines for parallel fork/join-style divide-and-conquer calculations. In my sample code (and admittedly, I’m new to coroutines), the performance is coming out dramatically worse than when I use Java’s Fork/Join API. Can someone help me understand what I’m doing wrong?

See my sample code and output. I take a big array, sum up each half of it, then add the two results together. I do this three ways: as two sequential calls, as two RecursiveTasks, and as two async coroutines. For small arrays, the overhead of parallelism doesn’t pay off, as expected. For larger arrays, the ForkJoin RecursiveTask definitely gets some speedup. However, the coroutines seem to always come out essentially equal to the sequential approach, regardless of size.

What’s going wrong here?

import kotlinx.coroutines.*
import kotlin.random.Random
import kotlin.time.measureTimedValue
import java.util.concurrent.RecursiveTask
import java.util.concurrent.ForkJoinPool

fun seqSubset(arr: Array<Double>, lo: Int, hi: Int): Double {
    var sum = 0.0
    for (i in lo..<hi) {
        sum += Math.sin(arr[i])
    }
    return sum
}

suspend fun suspendSumSubset(arr: Array<Double>, lo: Int, hi: Int): Double {
    var sum = 0.0
    for (i in lo..<hi) {
        sum += Math.sin(arr[i])
    }
    return sum
}

class SumTask(var arr: Array<Double>, var lo: Int, var hi: Int): RecursiveTask<Double>() {

    override fun compute(): Double {
        var sum = 0.0
        for (i in lo..<hi) {
            sum += Math.sin(arr[i])
        }
        return sum
    }
}


fun main() {
    for (size in listOf(100_000, 1_000_000, 10_000_000, 100_000_000)) {
        println("---------size ${size}-------------")

        var r = Random(90125)
        val arr = Array<Double>(size){r.nextDouble()};

        // Warmup the array
        val temp = seqSubset(arr, 0, size/2) + seqSubset(arr, size/2, size)

        // Add up both halves, sequentially
        val (seqSum, seqTime) = measureTimedValue {
            seqSubset(arr, 0, size/2) + seqSubset(arr, size/2, size)
        }
        println("Sequential: Sum = ${seqSum}, Time = ${seqTime}")

        // Add up both halves, in parallel using forkjoin
        val (forkjoinSum, forkjoinTime) = measureTimedValue {
            val pool = ForkJoinPool.commonPool()
            val first = pool.submit(SumTask(arr, 0, size/2))
            val second = pool.submit(SumTask(arr, size/2, size))
            first.get() + second.get()
        }

        println("Forkjoin:   Sum = ${forkjoinSum}, Time = ${forkjoinTime}")

        // Add up both halves, in parallel using coroutines
        val (parallelSum, parallelTime) = measureTimedValue {
            runBlocking {
                val one = async { suspendSumSubset(arr, 0, size/2) }
                val two = async { suspendSumSubset(arr, size/2, size) }
                one.await() + two.await()
            }
        }
        println("Coroutine:  Sum = ${parallelSum}, Time = ${parallelTime}")

    }
}
---------size 100000-------------
Sequential: Sum = 45955.64555073515, Time = 3.174088ms
Forkjoin:   Sum = 45955.64555073515, Time = 3.768909ms
Coroutine:  Sum = 45955.64555073515, Time = 32.491797ms
---------size 1000000-------------
Sequential: Sum = 459583.4914871033, Time = 13.475412ms
Forkjoin:   Sum = 459583.4914871033, Time = 18.685527ms
Coroutine:  Sum = 459583.4914871033, Time = 13.997924ms
---------size 10000000-------------
Sequential: Sum = 4596892.788499797, Time = 155.193015ms
Forkjoin:   Sum = 4596892.788499797, Time = 94.753415ms
Coroutine:  Sum = 4596892.788499797, Time = 151.826113ms
---------size 100000000-------------
Sequential: Sum = 4.59683565442808E7, Time = 1.682040672s
Forkjoin:   Sum = 4.59683565442808E7, Time = 1.081710663s
Coroutine:  Sum = 4.59683565442808E7, Time = 1.693147256s

(P.S.; I landed here as a simpler version of a divide-and-conquer recursive version, which really exaggerated these results further. I simplified to a non-recursive function here to try to make the question more constrained.)

Please use runBlocking(Dispatchers.Default). By default runBlocking creates a single-threaded dispatcher, so it is expected to see numbers similar to sequential approach.

BTW, suspendSumSubset doesn’t ever suspend, so it could be a regular function. But if you plan to fork recursively, it needs to be suspendable.

2 Likes

@broot Thanks so much. I tried that, and the difference is noticeable. I’m now seeing times that are more-or-less sort-of consistent with Forkjoin. A little slower, but similar-ish.

So I went back to my recursive version, and I got that to work reasonably as well. Thanks for your help!

I do have a followup question. Here’s the recursive version of my sum function. In order to make it work, I used withContext(currentCoroutineContext()) to avoid overheads of creating new contexts. Was there a better way to supply the context? Code is below.

suspend fun coroutineRecursive(arr: Array<Double>, lo: Int, hi: Int): Double {

    // Cutoff to try to avoid some overhead
    if (hi - lo <= 10000) {
        var ans = 0.0
        for (i in lo..<hi) {
            ans += Math.sin(arr[i])
        }
        return ans
    } else {
        val mid = (lo + hi) / 2
        return withContext(currentCoroutineContext()) {
                val left = async { coroutineRecursive(arr, lo, mid)}
                val right = coroutineRecursive(arr, mid, hi)
                left.await() + right
            }
    }
}

withContext is for… well, changing the context. If you just need to get access to the coroutine scope in order to fork, then the usual way is coroutineScope { }.

Answering your question directly, it should be something like: withContext(EmptyCoroutineContext). We don’t need to provide the current context as it is inherited automatically. But again, we should do coroutineScope instead.

1 Like

Thanks again, @broot, that works great.

A paradigm that coroutines should follow is main-safety: it should be safe to call any suspend function from the main thread. So your function should look like:

suspend fun coroutineRecursive(arr: Array<Double>, lo: Int, hi: Int): Double = withContext(Dispatchers.Default) {
    .
    .
    .
}

Then you don’t need to specify a dispatcher in runBlocking().

I disagree. Re-using a dispatcher of the caller is an intentional feature of coroutines. This way we can control the parallelism, thread pools, etc. used by a bigger block of coroutine code. Starting each and every suspend function with a dispatcher switch (if this is what you suggest) is an anti-pattern for me, I don’t think it is suggested anywhere in coroutine docs or Roman Elizarov’s blog.

1 Like

I completely agree with you there. On the other hand, if you’re explicitly doing something like IO, I think switching dispatchers internally is the right thing to do.

Yes, if a given function has a specific need to block, to run something in a main thread or in a specific thread pool, then the function itself should make the switch internally. But in a general case, I think it is better to not touch dispatchers and let the caller choose.

1 Like

As I’m following this and working to learn from it, I found this advice which talks about making suspend functions main-safe. Would you consider this to be an example of what you refer to, i.e., a function that has a specific need to block?

Here’s a post from Elizarov where he talks about it.

Thank you.

In fact, Roman provided exactly the same case we talk here. He considers a CPU-bound code to be blocking in a similar way to I/O, so he suggested to switch to Dispatchers.Default inside the function. I still feel we should approach this carefully and on a case by case basis, because by following this rule strictly we may easily get into situation where we start every function with withContext. But you are right.

I guess it’s more that any suspend function that is used by something else should be main safe. That might be something provided by a 3rd-party library, or it might be a function in your app’s repository that is called by the view model.

I agree that if you’re writing some self-contained asynchronous code where you want to control exactly how it’s run, then having the top-level code define the dispatchers and the lower-level code use them is the way to go.