Calling blocking code in coroutines

In the coroutines guide there are a lot of good examples; unfortunately, they’re fairly contrived. Specifically, none of them really demonstrate blocking (but non-CPU-bound) calculations.

Let’s suppose you want to make ProcessBuilder more coroutine-friendly, while fully taking advantage of parallelism. Might such a solution look like the following?

class ProcessExecutor(private val executor: Executor) {
    suspend fun execute(processBuilder: ProcessBuilder): Int =
            CompletableFuture.supplyAsync(Supplier {
                Thread.sleep(1000) // simulating a slow command
                processBuilder.start().waitFor()
            }, executor).await()
}

fun main(args: Array<String>) {
    val executor = Executors.newFixedThreadPool(4)
    runBlocking {
        val processBuilder = ProcessBuilder("cmd", "/C", "dir")
                .inheritIO()
        val result = ProcessExecutor(executor)
                .execute(processBuilder)
        println("Result: $result")
    }
    executor.shutdown()
}

Is this the best way to do this? If so, it raises a couple interesting points. One is you actually need two threadpools (er, executors) – one for doing the non-coroutine work (where CompletableFutures get submitted), and another for the coroutines, though you might use CommonPool. Correct?

Another question/point – as far as I can tell, submitting futures to threadpools and awaiting them seems by far the most pragmatic way of integrating with blocking code. Is that correct, or is there another, possibly better way to accomplish the same thing?

1 Like

You are on a right track, except that there is a general rule that you never need to use a CompletableFuture with coroutines (you can, but you don’t have to).

First of all, there is nothing inherently “coroutine unfriendly” in the ProcessBuilder APIs. Yes, it blocks the thread and that maybe unfortunate for certain applications, however it does not preclude their usage from inside of coroutines in anyway. If you are writing a backend application that is using a thread-pool, then it is not usually a big problem to block a few of those threads. You can just go ahead and use blocking APIs from coroutines! Not a big deal most of the time.

There are domain-specific exceptions to the above suggestion.

If you have a high-load application that is handling thousands of users and you must ensure that it stays responsive, despite the fact that some of the background jobs may block their respective thread, then you should indeed create several thread pools. Usually a thread pool per the kind of blocking work that you are going or per external service. In Enterprise world it is quite a common place that you get to integrate with some 3rd party API and all you have is a JAR file to this API and a blocking-only way to invoke it. In this case, you should create a separate thread-pool context for each API of that kind, so that failures in one API does not affect other operations of you application. You would then use run function from kotlinx.coroutines to switch into that context whenever you do a corresponding blocking operation. In your example, instead of val executor = Executors.newFixedThreadPool(4), I’d suggest to write:

val processContext = newFixedThreadPoolContext(4)

Now, your execute suspending function shall be written using run into this processContext:

suspend fun execute(processBuilder: ProcessBuilder): Int = run(processContext) {
    Thread.sleep(1000) // simulating a slow command
    processBuilder.start().waitFor()
}

Look ma, no futures!

Another use-case is when you have a UI application and you cannot block its UI thread. You can have any number (thousands) of coroutines running in UI thread, doing animations, running logic of NPCs in your game, etc, but no one of them can ever block, or else the whole UI thread freezes. So, if you need to use some blocking API from the UI-confined coroutine, then, again, you use run to switch to a different pool. For example:

launch(UI) { // start my coroutine in UI context that I must never block
     // I can do animations here
    delay(1000) // I can do suspending functions here
    // But I cannot use processBuilder.start().waitFor(), because it is blocking, so
    run(CommonPool) { // switch out of UI thread, UI thread will be suspended
        // now I can block a thread of CommonPool, but my UI will not freeze
    }
    // now I'm back to UI thread, must not do blocking stuff anymore
}
4 Likes

Gotcha. Yeah, that makes sense. That seems like two ways of accomplishing a similar thing – either create a thread pool directly and submit futures to it, or create a new thread pool context and just block directly. Are those practically equivalent? Or…is it somehow possible for another coroutine to get stalled behind a coroutine that’s blocking? I’m assuming when a coroutine becomes resumable it goes onto a queue of some sort; I don’t know if that queue is for the entire context (one big ConcurrentLinkedQueue), or per thread (as I believe ForkJoinPool does), or something else entirely.

I also wouldn’t be so quick to dismiss this as a purely ultra-high-performance use case. If you’re blocking CommonPool (for instance) in even a small number of tasks, I’d think you’d be losing out on the entire purpose of coroutines. Even in smaller apps, that’s gotta hurt.

Also, I appreciate the UI/game example. Amusingly, the API for client code would likely be the same as the thread-based approach – expose a runOnUiContext method that accepts a function.

The coroutine context is backed by a regular thread pool. The newFixedThreadPool creates a new instance of ScheduledThreadPoolExecutor, while CommonPool uses ForkJoinPool.commonPool. You can check the corresponding source code. It is pretty straightforward.

When coroutine suspends, it frees the thread to the next task of the thread pool. When corotutine resumes, it is scheduled to execute on the thread pool of its context again.

When you use run, the coroutine that invokes run is suspended, so it frees its thread, while the execution is sheduled onto the pool of the target context.

1 Like

Sorry to dig up an old thread. I’m trying to understand how cancellations work in the case of blocking code in coroutines.

Ideally, cancellation of a job would interrupt the thread running the blocking code, or at least allow the blocking code to complete but ignore it’s results/exceptions. I tried using @elizarov 's second example from above, switching to a newSingleThreadContext but doing so causes the job to no longer be cancelable. Calling job.cancel() returns true but the coroutine keeps executing.

In trying to debug this, I tried using run along with suspendCancellableCoroutine but the outcome was the same.

Submitting the block to an executor has the behavior I expect:

suspend fun <T> runInExecutor(executor: Executor, block: () -> T): T = suspendCancellableCoroutine { cont ->
    executor.execute {
        try {
            cont.resume(block())
        } catch (e: Throwable) {
            if (!cont.isCompleted) cont.resumeWithException(e)
        }
    }
}

However, I can’t seem to figure out why the run() solution doesn’t work. Is run() thinking CommonPool and newSingleThreadContext are the same and not actually suspending?

Here’s a rather contrived example demonstrating what I’m talking about: Blocking coroutine behavior · GitHub

Cancelling a coroutine does not interrupt a thread. This is done so by design, because, unfortunately, many Java libraries incorrectly operate in interrupted threads. You can use Thread.interrupt safely only for the code that you know well / wrote yourself. It is also not a good idea to interrupt a thread that you don’t own.
Btw, java.util.concurrent.Future.cancel has a boolean mayInterruptIfRunning parameter.

If you have a piece of blocking code that you know as being well-behaved when thread interruption flag is set, you’ll have to write your own adapter to covert coroutine cancellation into thread interruption.

Having said that, it might be a desirable feature to optionally support thread interruption for newSingleThreadContext and its friends (including Executor.asCoroutineDispatcher extension), so that you can create your own context that you know is going to be used with interruptible blocking code. I’ve created the corresponding change request: Support optional thread interrupt in newSingleThreadContext as its friends · Issue #57 · Kotlin/kotlinx.coroutines · GitHub

1 Like

Thanks @elizarov that makes since

I meet a same problem.
For example:


val singleThread = newSingleThreadContext(name)

//workA
launch(singleThread) {
...
}

//workB
launch(singleThread) {
...
}

Now I know if the workA and workB is finish singleThread is not close,I want know that how can I
close the singleThread safely.

Hi,

I came across this thread when googling and was wondering what the correct way to do this is in 2020, using Kotlin 1.3.72, kotlinx.coroutines 1.3.8 and IntelliJ IDEA 2020.2? This does not compile:

val processContext = newFixedThreadPoolContext(5, "asdf")
suspend fun execute(processBuilder: ProcessBuilder): Int = run(processContext) {
    Thread.sleep(1000) // simulating a slow command
    processBuilder.start().waitFor()
}

It complains on the run:

e: /Users/simon/code/test-kotlin-coroutines/src/main/kotlin/test/kotlin/coroutines/App.kt: (19, 60): Type inference failed: inline fun <R> run(block: () -> R): R
cannot be applied to
(ExecutorCoroutineDispatcher)

e: /Users/simon/code/test-kotlin-coroutines/src/main/kotlin/test/kotlin/coroutines/App.kt: (19, 64): Type mismatch: inferred type is ExecutorCoroutineDispatcher but () -> Int was expected
e: /Users/simon/code/test-kotlin-coroutines/src/main/kotlin/test/kotlin/coroutines/App.kt: (19, 80): Too many arguments for public inline fun <R> run(block: () -> R): R defined in kotlin

When instead using runBlocking, it seems to work; however, IntelliJ IDEA warns for “Inappropriate blocking method call”. runInterruptible is the other thing I could find, and seems to work fine – so is that what I should use?

IntelliJ IDEA also warns about newFixedThreadPoolContext:

NOTE: This API will be replaced in the future. A different API to create thread-limited thread pools that is based on a shared thread-pool and does not require the resulting dispatcher to be explicitly closed will be provided, thus avoiding potential thread leaks and also significantly improving performance, due to coroutine-oriented scheduling policy and thread-switch minimization. See issue #261 for details. If you need a completely separate thread-pool with scheduling policy that is based on the standard JDK executors, use the following expression: Executors.newFixedThreadPool().asCoroutineDispatcher(). See Executor.asCoroutineDispatcher for details.

I tried this:

val ctx = Executors.newFixedThreadPool(4).asCoroutineDispatcher()
fun main(args: Array<String>) {
    GlobalScope.launch { // launch a new coroutine in background and continue
        runInterruptible(ctx) {
            Thread.sleep(1000L)
        }
        println("World!")
    }
    println("Hello,")
    Thread.sleep(2000L)
}

However, the effect then was that the program never exited, I guess some thread was kept alive.

Is not clear why you’re using Thread.sleep here at all. You probably want to look into the coroutine nonblocking equivalent to sleep : “delay(dalyinmillis)”

The Thread.sleep’s used in this thread are just placeholders for blocking code.

Have your tried using a “withContext(ctx)” block instead of runInterruptible? I believe I’ve used that in the past to run blocking IO inside a coroutine.

1 Like