Are flows/coroutines slow?

I am observing slow performance from coroutines/flows.

Here is the snippet

    private suspend fun Collection<Solver>.solve(context) = coroutineScope {
        val channel = produce {
            forEach {
                launch { send(it.solve()) }
            }
        }

        val result  = ArrayList<Solution>(size)
        launch{
            for (solution in channel) {
                context.update(solution)
                result.add(solution)
            }
        }
        result
    }

I have collection of solvers(1000+) which I want to run i parallel and collect their results + update some context. I have machine with 190 cores. When running this code I see that CPU is barely used (10-15%) and Yourkit shows that threads most of the time are in wait state.

Is it expected behavior? Are there ways to better utilize threads?

What dispatcher do you use? What are solvers? Are they suspending (Solver.solve() is a suspend function)? Are they CPU-intensive only and independent, or maybe they do some I/O, they access synchronized resources, or they make threads sleep for any other reason? How much heavy is context.update() operation?

BTW, I think you can replace your code with simply this:

coroutineScope {
    map { async { it.solve() } }.awaitAll()
}

You would need to add context.update() somewhere.

I wasn’t clear about the caller of this function. It goes basically like this

while(someCondition){
val solutions = solvers.solve(context)
doSomethingWithResults(solutions)
}

solve is called many times.
awaitAll is not an option. The reason is that there is a distribution in solve times - 100us-800us. It depends on complexity of the solving problem. solve primary does binary sort or min search in array of objects. The performance of the function in this case will be the time of slowest execution + context update.
This is how thread usage looks like. Threads just sit and wait for work.

I need to figure out why.

Again, it is very important which dispatcher do you use, because dispatcher directly controls how many and which threads do you use. How do you start a coroutine that calls the above code?

You don’t have to update the context after collecting all results, you can update while collecting:

coroutineScope {
    map { async { it.solve() } }
        .map { job -> job.await().also { context.update(it) } }
}

This is still a little different than your solution, because it consumes results in order. If e.g. 5th item is processed the fastest, updating will still wait for the first item. However, that problem should affect only the first/last (depending on perspective) batch of items, so I think if you have 1000+ items and 190 threads, the performance hit should be minor. But you are correct: by using a channel we start updating items asap as we don’t care about their ordering.

Do your solve algorithms utilize features of coroutines, so they are suspending, they fork and join, etc. or they are fully sequential? I ask, because if you have tasks that are purely CPU-intensive, they never wait for anything, they never fork, etc. and you just need to schedule them on threads, then coroutines won’t really give you too much benefits over the traditional solution with executors. There are other reason why you could choose to use coroutines, for example their API or structured concurrency, but at least the performance should be comparable to executors.

You should probably add a dispatcher:

launch(Dispatchers.Default) { send(it.solve()) }

[quote=“fedorov1984, post:1, topic:26955”]
fun
[/quite]