Coroutines and deadlocks

I have been experimenting with using coroutines to add some parallelisation features to my project. I came across a deadlock, and I have created a small testcase that illustrates the problem.

When running the following code, all the threads end up deadlocking due to the inner runBlocking. Note that in the real code, there is a lot of non-suspending functions between main and computeString, so making everything suspend is not a solution in this case.

What is the proper way to achieve this?

import kotlinx.coroutines.*

fun computeString(index: Int): String {
    val d = CompletableDeferred<Int>()
    GlobalScope.launch {
        d.complete(index)
    }
    val number = runBlocking {
        d.await()
    }
    return "String from: ${index}. Number: ${number}"
}

fun main() {
    println("This is a test of coroutines")
    val jobs = ArrayList<Deferred<String>>()
    // Repeat count should be higher than the number of cores
    repeat(50) { i ->
        val d = CompletableDeferred<String>()
        GlobalScope.launch {
            d.complete(computeString(i))
        }
        jobs.add(d)
    }

    runBlocking {
        jobs.forEach { d ->
            val text = d.await()
            println("Result: ${text}")
        }
    }
}

Not sure, but looks like a bug in coroutines. Simplified reproducer:

import kotlinx.coroutines.*

suspend fun main() {
    (1..Runtime.getRuntime().availableProcessors()).map {
        GlobalScope.async {
            runBlocking {
                GlobalScope.async{}.await()
            }
        }
    }.awaitAll()

    println("done")
}

If I remember correctly, Default dispatcher uses FJP, which should automatically expand when all threads are blocked.

You launched a runBlocking inside the default dispatcher for each core… You are abusing the coroutine machinery! Do not use runBlocking from an asynchronous context, it is meant to enter an asynchronous context!

Also IntelliJ IDEA strongly warns you against using runBlocking inside a suspending context, do not ignore its wise advices!

Yes, usually blocking code is executed using Dispatchers.IO. But it’s not about best practices or something. It’s about coroutines deadlocked once reached initial capacity of the thread pool.

Fair enough but how is this supposed to be solved then? Remember that in the actual application, between the two functions are plenty of non-suspending functionssso I need some way of calling await so I can read the result.

Generally, this is how you wait for blocking code:

withContext(Dispatchers.IO) {
   blockingMethod()
} 

withContext will suspend until the blocking call returns.

Coroutines makes async code easier to read. It does not fix design flaws. If you have code blocking a thread waiting in the same thread pool of the action being waited on, you are asking for a deadlock, regardless of what async library you use.

Ensure that any method blocking its thread on an async task cannot block the async task. For example, call runBlocking from your own regular thread pool while the async coroutine runs in the provided Dispatchers.

1 Like

Don’t assume that Dispatchers.IO is unlimited, you can consider to use your own dispatcher.

Default dispatcher does not use FJP as default anymore, moreover FJP is a very limited executor.

What is the FJP dispatcher? I assumed that the default dispatching mechanism was similar to how processes work in Erlang, where you can fire off any number you want and it’ll be scheduled appropriately.

In my test code, it’s definitely not obvious that a deadlock can occur. Imagine the two functions being part of two separate libraries. On their own they are perfectly reasonable, but when run together they deadlock. That’s very surprising to me.

What seems to happen here is that even though there are threads that could run, they don’t get a chance to run because the dispatcher has locked up all available threads. What I would expect to happen here is that one of the blocked threads would become unblocked, and directed to run one of the other coroutines that are waiting to run. This is what Erlang would do in this case.

The Kotlin coroutine documentation suggests that I can treat coroutines as lightweight threads that work like threads but are dispatched on a smaller number of actual operating system threads. This seems to not be the case, at least not with the dispatchers that are available by default?

The Java ForkJoinPool Executor.

Where did you find that?

Yes, but you have to avoid any blocking function, like runBlocking.

None of both is reasonable.
If you block a shared, limited pool supposing it contains always enough Thread to execute your code then you are wrong.

But your platform is JVM, not BEAM.

You are right, so you should care to not block none of Dispatcher.Default's Thread, never, and consider Dispatcher.IO for blocking invocation.

runBlocking is a bridge function, you should use it if and only if you must block the thread.
My suggestion for you: use Dispatcher.Default thinking that it contains only one (or two) thread for the whole application.

1 Like

Details for each dispatcher is in the docs:
Dispatchers.Default
Dispatchers.IO

By the way, the task you are trying to write can be easily expressed as:

suspend fun computeString(index: Int) =
    "String from: $index. Number: $index"

suspend fun main() = coroutineScope {
    repeat(50) {
        launch { println("Result: ${computeString(it)}") }
    }
}

Also the code you wrote contains so many conceptual errors that what you are trying to do is not completely understandable. Could you elaborate what you need?

Thanks for the reply, and my apologies for the tone in my previous message. I was frustrated after having messed with this for a long time.

I should definitely have explained issue better.

What I have is a very large number of compute units. They are in an array, which is simulated in my test code above by the repeat loop. In the real code, there is a loop that iterates over all elements of the array (it could be millions of them) and computes each one. The computation function is represented by the function computeString in my example code.

Since each compute unit is independent, I can do it in parallel, and simply using the default dispatcher and a set of Deferred instances solved this neatly. In a manner similar to what one can do in Erlang, things were being computed in parallel limited by the number of CPU cores, and I wasn’t creating millions of real threads.

Now, note that your suggesting to make computeString into a suspend function isn’t really possible because even though it’s a simple function in my example, it’s actually the bulk of the application that sits behind it, and some of the functions call into third party Java libraries.

The problem i discovered was that if one of the compute functions uses the same trick to parallelise its internals, then you end up deadlocking the application. Note how the two uses of the “create an array of deferred instances and start a number of coroutines to calculate them” are independent, and each on their own works fine, but when put together we find ourselves with a deadlock.

I was originally going to write my own threading code with a manual thread pool, job queues etc, but I wanted to try using coroutines first since if it worked the way I assumed (that they work just like threads) the implementation would be a lot cleaner.

The documentation does say something like “coroutines are like lightweight threads”. I’m on mobile right now so I’m not finding the link, but it’s on the first page of the guide.

Giventthis explanation (which turned out to be long, apologies for that), what is your advice? Should I simply stop using coroutines for this and creates my own framework for handling parallelism?

so you have java legacy with lots of blocking code. Your options:

  • stop using coroutines, go with regular threads
  • rewrite legacy in non-blocking way
  • play with Dispatchers.IO or custom dispatchers. Something like Executors.newXXX().asCoroutinesDispatcher(). Bad tradeoff: either waste your memory or stumble upon deadlocks.
  • wait for Loom release
  • give up

Thank you for this information. It’s a difficult choice, as I have several different requirements weighing against eachother. I will experiment with each one of these approaches and see which ones are reasonable.

Loom does sound like the most appealing, although it seems as it will take a while for it to arrive.

I believe that what you need are Flows. If you publish some example code we may help you. Also, I recommend you to read the Medium Blog of Roman Elizarov where he explains the basics of Coroutines and much more!

Are you discarded Java Parallel Stream on ForkJoinPool?

Yes, I have not given streams any thought. The reason for this is that while my example linearly accessed the members of the array, in practice access is random access, and not all elements are accessed.

The design is significantly more complex than it may seem by just looking at the example above. The actual code is open source, and you’re welcome to have a look. However, it may not be immediately obvious what’s going on.

@lamba92 was asking what the code looks like, so I will share it here with a brief explanation. I don’t actually expect anyone to spend their personal time understanding my code base, so please don’t assume that I want people to solve my problems for me.

At its core, there is the type APLValue. Instances of this interface represents scalar values, or arrays. The property dimensions contains the size of the array (if it’s empty, the value is a scalar value).

When the APLValue instance is an array, it’s either an actual array of values (implemented by APLArrayImpl) or it’s a composite value, for example the deferred result of a computation. In other words, if the user wishes to add two arrays by typing a+b, the system will not immediately add these arrays, but instead construct an instance of ArraySum2Args which contains references to both arguments as well as a function that is to be applied on its members.

The method valueAt(p: Int) returns the value for a given slot in an array, and the implementation of this method in ArraySum2Args is what actually performs the computation. In other words, results are not computed until they are needed.

These deferred computations can of course be stacked, such as when a user types a+b+c.

Now, a common operation is to actually realise the underlying values of an array. In other words, force the computation of all the values. This is implemented by the method collapse. This method recursively descends the stack of deferred values to return an array of all values.

The plan all along has been to implement parallel evaluation of the collapse function. But, before doing I found myself implementing a specific operation that could benefit from parallelism, so I posted a question here not long ago asking for design advice around coroutines. Here’s the discussion: Coroutines design advice

As a result, I ended up with the implementation of valueAt for ScanResult1Arg.

This implementation worked fine, until I decided to implement parallelisation for collapse. This code has not committed to Github yet. Its implementation looks similar to the outer loop in the example I posted in the initial post. Once this was implemented, everything worked fine, until the test case that tests the “scan” operator on large arrays. This is when the deadlock happened because the two implementations are not compatible with eachother.

Here are the implementations of the functions mentioned above:

APLValue: https://github.com/lokedhs/array/blob/master/array/src/commonMain/kotlin/array/types.kt#L64
ArraySum2Args: https://github.com/lokedhs/array/blob/master/array/src/commonMain/kotlin/array/builtins/math_functions.kt#L34
collapse: https://github.com/lokedhs/array/blob/master/array/src/commonMain/kotlin/array/types.kt#L88
ScanResult1Arg: https://github.com/lokedhs/array/blob/master/array/src/commonMain/kotlin/array/builtins/reduce.kt#L118

You can’t do what you want without making computeString a suspend function and getting rid of all the runBlocking. You similarly can’t do it in Java without making a similar transformation.

You don’t need to make eveything suspend, but you do have to convert anything that currently has a runBlocking in it, and all callers of those things, transitively until you get out to a non-dispatcher thread.

You seem reluctant to do this for performance reasons, but it looks to me like the performance of your code will actually improve significantly when you do this. Entering a runBlocking context is pretty expensive, and you do it all over the place. Also, you are not generally optimizing your code to the extent that you’d notice the relatively small overhead of calling a suspend function. runBlocking costs about 100 times more.

The cost of calling a suspending function is basically the cost of creating a small object like a singleton list, or performing a string addition, or any number of other things that you do without worrying about it.

Actually, it’s not for performance reasons I don’t want to do it. In fact, I tried doing so today, in order to see how it went.

The first issue is that almost every single function in this project leads to a call to valueAt. Making this function suspending means that almost all other functions must be suspending too.

This in itself isn’t much of a problem actually, since as you correctly noted, the performance impact isn’t great enough for it to matter.

The real problem is that I have functions such as asSequence that returns a lazily evaluated Sequence of APLValue instances. The Sequence class isn’t suspending, so my subclass also can’t be. There are more examples of cases where I subclass various Kotlin and Java API’s that end up calling valueAt.

Another problem is that valueAt is not only used internally throughout the entire code base. It’s also part of the external API. Of course, nothing prevents me from making the external API suspending as well, but then we have the question as to how to pass the correct CoroutineContext, since lower level functions needs to create new coroutines. The documentation recommends against using CoroutineScope(coroutineContext) in functions that are part of an external interface.