Issue running coroutine concurrently

I am trying to run multiple independent task parallel to each other using coroutine, but unfortunately they are running one after another, but it really works in case of example, and not in case of real implementation.

In the below example, Task3 completes first and than 2 and finally 1

fun main() {
    val tasklist = listOf(
        Task("task1", 5000),
        Task("task2", 3000),
        Task("task3", 1000)
    )
    taskrunner(tasklist)
}

data class Task(val name: String, val delay: Long)

fun taskrunner(tasklist: List<Task>) = runBlocking {
    tasklist.forEach {
        async {
            runTask(it)
        }
    }
}

suspend fun runTask(task: Task) {
    println("starting task ${task.name}")
    delay(task.delay)
    println("completing task ${task.name}")
}

But when I replace runTask with the real implementation or simply replacing delay with Thread.sleep, it becomes sequential instead of parallel.

fun runTask(task: Task) {
    println("starting task ${task.name}")
    Thread.sleep(task.delay)
    println("completing task ${task.name}")
}

Can someone please help me fixing this issue

By default runBlocking() uses the thread that invoked it to create a single threaded coroutine dispatcher and coroutines are invoked only within this single thread. To use other threads you need to provide alternative dispatcher, e.g.:

runBlocking(Dispatchers.Default) {  }

Also note that by blocking the thread (Thread.sleep()) you basically always do tasks partially sequentially. For example, if you have 4 CPU cores and you run 20 tasks like above then even with Dispatchers.Default only first 4 tasks will execute in parallel and the rest will start when the first will finish. This is different with delay() as it does not block the thread.

1 Like

I’d set the dispatcher on the async call instead of the runBlocking call.

Hmm… is there any important difference between these two or is it just a matter of taste?

In this particular instance, they’re the same. But when the OP moves from a toy problem to real code, they won’t be using the runBlocking.

You can launch just one coroutine and then send tasks to the channel.

fun taskrunner(tasklist: List<Task>) = runBlocking {
    val channel = Channel<Task>()
    launch(Dispatchers.Default) {
        for (task in channel) {
            try {
                runTask(task)
            } catch (e: Exception) {
                // log or ignore
            }
        }
    }
    tasklist.forEach { channel.send(it) }
    channel.close()
}

Another solution is to use newSingleThreadContext(), but this may be tricky.

Ok, but why do you intentionally run tasks sequentially if this is exactly what OP complained about?

(well, newSingleThreadContext() does not really dispatch sequentially, but it greatly limits concurrency)

Well, perhaps I misunderstood the original question…