I’m experimenting with using coroutines for parallel fork/join-style divide-and-conquer calculations. In my sample code (and admittedly, I’m new to coroutines), the performance is coming out dramatically worse than when I use Java’s Fork/Join API. Can someone help me understand what I’m doing wrong?
See my sample code and output. I take a big array, sum up each half of it, then add the two results together. I do this three ways: as two sequential calls, as two RecursiveTasks, and as two async coroutines. For small arrays, the overhead of parallelism doesn’t pay off, as expected. For larger arrays, the ForkJoin RecursiveTask definitely gets some speedup. However, the coroutines seem to always come out essentially equal to the sequential approach, regardless of size.
What’s going wrong here?
import kotlinx.coroutines.*
import kotlin.random.Random
import kotlin.time.measureTimedValue
import java.util.concurrent.RecursiveTask
import java.util.concurrent.ForkJoinPool
fun seqSubset(arr: Array<Double>, lo: Int, hi: Int): Double {
var sum = 0.0
for (i in lo..<hi) {
sum += Math.sin(arr[i])
}
return sum
}
suspend fun suspendSumSubset(arr: Array<Double>, lo: Int, hi: Int): Double {
var sum = 0.0
for (i in lo..<hi) {
sum += Math.sin(arr[i])
}
return sum
}
class SumTask(var arr: Array<Double>, var lo: Int, var hi: Int): RecursiveTask<Double>() {
override fun compute(): Double {
var sum = 0.0
for (i in lo..<hi) {
sum += Math.sin(arr[i])
}
return sum
}
}
fun main() {
for (size in listOf(100_000, 1_000_000, 10_000_000, 100_000_000)) {
println("---------size ${size}-------------")
var r = Random(90125)
val arr = Array<Double>(size){r.nextDouble()};
// Warmup the array
val temp = seqSubset(arr, 0, size/2) + seqSubset(arr, size/2, size)
// Add up both halves, sequentially
val (seqSum, seqTime) = measureTimedValue {
seqSubset(arr, 0, size/2) + seqSubset(arr, size/2, size)
}
println("Sequential: Sum = ${seqSum}, Time = ${seqTime}")
// Add up both halves, in parallel using forkjoin
val (forkjoinSum, forkjoinTime) = measureTimedValue {
val pool = ForkJoinPool.commonPool()
val first = pool.submit(SumTask(arr, 0, size/2))
val second = pool.submit(SumTask(arr, size/2, size))
first.get() + second.get()
}
println("Forkjoin: Sum = ${forkjoinSum}, Time = ${forkjoinTime}")
// Add up both halves, in parallel using coroutines
val (parallelSum, parallelTime) = measureTimedValue {
runBlocking {
val one = async { suspendSumSubset(arr, 0, size/2) }
val two = async { suspendSumSubset(arr, size/2, size) }
one.await() + two.await()
}
}
println("Coroutine: Sum = ${parallelSum}, Time = ${parallelTime}")
}
}
---------size 100000-------------
Sequential: Sum = 45955.64555073515, Time = 3.174088ms
Forkjoin: Sum = 45955.64555073515, Time = 3.768909ms
Coroutine: Sum = 45955.64555073515, Time = 32.491797ms
---------size 1000000-------------
Sequential: Sum = 459583.4914871033, Time = 13.475412ms
Forkjoin: Sum = 459583.4914871033, Time = 18.685527ms
Coroutine: Sum = 459583.4914871033, Time = 13.997924ms
---------size 10000000-------------
Sequential: Sum = 4596892.788499797, Time = 155.193015ms
Forkjoin: Sum = 4596892.788499797, Time = 94.753415ms
Coroutine: Sum = 4596892.788499797, Time = 151.826113ms
---------size 100000000-------------
Sequential: Sum = 4.59683565442808E7, Time = 1.682040672s
Forkjoin: Sum = 4.59683565442808E7, Time = 1.081710663s
Coroutine: Sum = 4.59683565442808E7, Time = 1.693147256s
(P.S.; I landed here as a simpler version of a divide-and-conquer recursive version, which really exaggerated these results further. I simplified to a non-recursive function here to try to make the question more constrained.)