Coroutines concurrency in a loop

Hello, I’m playing with coroutines and curious how to make this loop concurrent? Currently all calculations are on the same thread DefaultDispatcher-worker-1. This code runs in 41s and have O(n=20) time complexity.

    @Test
    fun performanceAsync() = runBlocking {
        val count = 20
        val list = mutableListOf<String>()

        repeat(count) {
            val d1 = async(Dispatchers.Default) { doHardStuff(it * 2) }
            d1.await().apply { list.add(this) }
            val d2 = async(Dispatchers.Default) { doHardStuff(it * 2 + 1) }
            d2.await().apply { list.add(this) }
        }
        assertEquals((0 until count * 2).joinToString(), list.joinToString())
    }
    private suspend inline fun doHardStuff(value: Int): String {
        println("${Thread.currentThread().name}: $value")
        delay(1000L)
        return value.toString()
    }

The problem here seems in the repeating await(). So, slightly changing the code, I have this:

    @Test
    fun performanceAsync2() = runBlocking {
        val count = 20
        val list = mutableListOf<String>()
        val d1 = mutableListOf<Deferred<String>>()
        val d2 = mutableListOf<Deferred<String>>()

        repeat(count) {
            d1.add(async(Dispatchers.Default) { doHardStuff(it * 2) })
            d2.add(async(Dispatchers.Default) { doHardStuff(it * 2 + 1) })
        }
        repeat(count) {
            list.add(it * 2, d1.awaitAll()[it])
            list.add(it * 2 + 1, d2.awaitAll()[it])
        }
        assertEquals((0 until count * 2).joinToString(), list.joinToString())
    }

And this code runs in 1s but has O(2n) time complexity. So, how can I make the code runs in 1s and has O(n) time complexity? (count = 20 shouldn’t be changed :slight_smile: )

Why does it bother you to have O(2n) complexity? In most cases it is considered the same as O(n).

Yes, it’s just a test case. But in a real world I’d like to make my code shorter. And the main question is: how to run coroutines in a loop concurrently on different threads (workers)? How can I improve my first sample to run it in 1s without adding a lot of extra code (the 2nd sample is too verbose)?

Your second example is exactly what you need. It can definitely be shorter/cleaner, but mostly by using things like map() and not by changing the way how you use coroutines/deferred values.

No, this is not true. Time complexity does not tell you how performant is your code in general. It tells you how your code will behave in relation to the number of iterations.

For example, if you perform a loop twice, taking 1s for each iteration and then you will “optimize” it to loop only once, but with 3s per iteration then your code will be less performant, even if you get from O(2n) to O(n). Comparing O(2n) and O(n) usually doesn’t make sense, because you would need to somehow measure the time needed for each iteration. As a fact, in your above code you basically split one loop that does more per iteration into two loops doing two times less work per iteration.

1 Like

Is it short enough?

val list = (0 until count * 2 step 2)
    .flatMap { listOf(
        async(Dispatchers.Default) { doHardStuff(it) },
        async(Dispatchers.Default) { doHardStuff(it + 1) },
    ) }
    .awaitAll()

Technically, there are still two loops.

Also, for now it doesn’t really make sense to split each iteration into d1 and d2, because they both do the same, but I guess your point is that you need to e.g. use doHardStuff() for odds and doHardStuff2() for evens, so I intentionally did not make it even shorter.

1 Like

Good enough. Looks close to a sample from here.

If I want to make such calculations “old way” within a thread, it looks slightly awful (and incorrect):

    @Test
    fun performanceAsync4() = runBlocking {
        val count = 20
        val list = synchronizedList(mutableListOf<String>())
        val jobs = mutableListOf<Job>()
        val i = AtomicInteger(0)
        while (i.toInt() < count * 2 - 1){
            jobs.add(launch(Dispatchers.Default) {
                println("${Thread.currentThread().name}: $i")
                delay(1000L)
                list.add(i.toString() )
                i.incrementAndGet()
            })
        }
        jobs.joinAll()
        assertEquals((0 until count * 2).joinToString(), list.joinToString())
    }

Can it be improved or it’s not the case to use such coding ways in modern coroutines world?

P.S. it works with this change :slight_smile: : while (jobs.size < count * 2) {

I’m not really sure what is your goal here. There are always three possible solutions to every problem: good, bad and ugly :wink:

The meaning of “old way” in your post is unclear. You said about threads, but your solution is still based on coroutines. “old way” could mean using loops instead of functional style or using shared variables instead of deferred values. But even using loops and a shared results list the code could be pretty clean.

Also, your code is incorrect even after changing the while condition. It may potentially generate the same item multiple times. And I would avoid using synchronized code with coroutines, because Java synchronization blocks threads. It may be not a big deal in the case of synchronized lists as blocking is very short.

1 Like

Using a new buildList() builder we can also calculate it this way :wink:

@Test
fun performanceAsync() = runBlocking {
	val count = 40
	
	val values = buildList(count) {
		for (n in 0 until count step 2) {
			this += async(Dispatchers.Default) { doHardStuff(n) }
			this += async(Dispatchers.Default) { doHardStuff(n + 1) }
		}
	}.awaitAll()
	
	assertThat(values.joinToString()).isEqualTo((0 until count).joinToString())
}

Well, if you ask me dividing those async() calls into 2 statements doesn’t make too much sense it this case.

1 Like