Repeated calls to suspendCoroutine seem to break Coroutine scheduling

Hey there!

Here’s a weird thing that took me considerable time to figure out: calling suspendCoroutine in a loop seems to break Coroutine scheduling in some cases. Specifically, I have this reduced example:

import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.time.Instant
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import kotlin.concurrent.thread
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine

fun main() {
    val queue: LinkedBlockingQueue<String> = LinkedBlockingQueue()

    thread {
        while (true) {
            queue.put(Instant.now().toString())

            Thread.sleep(1000)
        }
    }

    runBlocking {
        launch {
            while (true) {
                val value = suspendCoroutine<String> {
                    it.resume(queue.poll(1, TimeUnit.SECONDS))
                }

                println("value = $value")
            }
        }

        launch {
            while (true) {
                println("Working")

                delay(1000)
            }
        }
    }
}

This simulates what I was trying to do in a larger project. Basically, I launch several background tasks, one of which loads events from a remote source (this is essentially what the queue represents). If you run this example, you’ll see that the program continues to print value = ... but never prints Working from the other background job:

value = 2021-12-07T11:12:02.205919200Z
value = 2021-12-07T11:12:03.224662200Z
value = 2021-12-07T11:12:04.234728Z

Now, if I change suspendCoroutine to just use a delay in the first worker, everything works fine. In particular, you can replace the loop body that contains suspendCoroutine with this:

val value = queue.poll()

if (value == null) {
     delay(1000)
} else {
     println("value = $value")
}

In this case both worker jobs correctly print their output:

value = 2021-12-07T11:11:33.831285500Z
Working
value = 2021-12-07T11:11:34.847789800Z
Working
value = 2021-12-07T11:11:35.854931400Z
Working
value = 2021-12-07T11:11:36.864445900Z
Working

Now, my question: Is this normal behavior? Is this expected if one calls suspendCoroutine in a loop like this? Why does calling suspendCoroutine also impact the other coroutine job that was launched separately?

I’d be glad to learn what I did wrong and where my understanding of coroutines was incomplete :slight_smile:

I believe this is not because of running suspendCoroutine() in a loop, but because you block the thread using poll(). You should almost never block the thread when working with coroutines, because it may cause coroutines to become unresponsive - as in your example.

Thanks for the response! Somehow my understanding was that because suspendCoroutine can be used to convert synchronous code to coroutines, using a blocking call there is not an issue. In my mind suspendCoroutine would allow other coroutines to continue execution especially given that Dispatchers.Default is backed by a JVM thread pool. I guess that is wrong?

Also the first example works when using launch(Dispatchers.IO). This to me would indicate that calling suspendCoroutine can in fact block coroutines running on Dispatchers.Default from executing. Is that correct?

Invoking synchronous code from coroutines is always tricky because that code was designed specifically to block. So yes, use launch(Dispatchers.IO) { } or withContext(Dispatchers.IO) { } to handle that.

In your case tho, if you replace the queue with a kotlinx.coroutines.Channel you will solve all the issues.

I think I got the main point that I was confused about: I thought that the coroutines in my example somehow run on the thread pool that Kotlin provides for running coroutines. BUT: runBlocking acts as its own Dispatcher that runs single-threaded and thus calling queue.poll will block all other coroutines from running.

Still, even calling queue.poll on Dispatchers.Default will block one thread and may block execution on single-core machines. So the best solution is to use launch(Dispatchers.IO) to move the blocking code to the IO thread pool.

Actually I’m using SharedFlow as I have a single event source but multiple receivers and thus need broadcasting semantics. Otherwise I agree!

The only (?) safe way to perform blocking calls inside coroutines is by using Dispatchers.IO or our own thread (pool) meant for blocking. And this is more like a workaround than real converting of blocking to non-blocking. It still blocks, but utilizes threads that are expected to be blocked.

You are correct that suspendCoroutine() is often used to adapt the existing, non-suspend code and make it suspendable. However, this is not for synchronous/blocking code, but rather for code that is asynchronous and usually based on callbacks.

In that case, your example becomes confusing to me. I understand your real situation is probably much more complicated than above, but the main point in using Channel or Flow instead of BlockingQueue is that then we don’t have to block in the first place. Both Channel and Flow are consumed in a suspending way, so we don’t need Dispatchers.IO or suspendCoroutine() anymore.