Coroutines and cancelling blocking code

I have to interact with a blocking library from coroutine code and want to be able to cancel the blocking call. The situation is something similar to the following

import kotlinx.coroutines.*

fun main() {
  val jobA = GlobalScope.launch {
    someSuspendedFun()
    println("Done A")
  }
  val jobB = GlobalScope.launch(Dispatchers.IO) {
    someLibraryBlockingFun()
    println("Done B")
  }

  runBlocking {
    delay(500)
    jobA.cancel()
    jobB.cancel()

    joinAll(jobA, jobB)
  }
}

suspend fun someSuspendedFun() {
  delay(1_000)
}

fun someLibraryBlockingFun() {
  Thread.sleep(1_000)
}

here jobB.cancel() doesn’t work because it will not Thread#interrupt the Thread.sleep(...) call, so Done B gets printed. What should I do in this kind of situation?

Thanks

1 Like

I thought I should also post what I’m doing right now, which is something similar to this


import kotlinx.coroutines.*
import java.util.concurrent.Executors
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException

val exec = Executors.newCachedThreadPool()

fun main() {
  val job = GlobalScope.launch {
    runCancellableBlocking {
      Thread.sleep(1_000)
    }
    println("Done")
  }

  runBlocking {
    delay(500)
    job.cancelAndJoin()
  }

  exec.shutdown()
}

suspend fun <E> runCancellableBlocking(block: () -> E): E {
  return suspendCancellableCoroutine { cont ->
    val f = exec.submit {
      try {
        cont.resume(block())
      }
      catch (e: InterruptedException) {
        cont.cancel()
      }
      catch(e: Exception) {
        cont.resumeWithException(e)
      }
    }
    cont.invokeOnCancellation {
      f.cancel(true)
    }
  }
}

but it doesn’t seem very idiomatic, so I was wondering if there is a better way.

Take a look at runInterruptible which is exactly built for this purpose.

You can’t cancel blocking code from a coroutine. It is not possible at all. Some libraries check for the thread interuption flag in a cycle. If you want to use that, you need to launch a separate dispatcher for that.

runInterruptible allows to interrupt a coroutine in a suspesion point or during the delay, but not the blocking code.

1 Like

Doesn’t this depend on whether the blocking operation is interruptible or not? For example, runInterruptible() won’t work with Socket, but it should with SocketChannel. Although, it is a matter of what we mean by “blocking”. As I understand, SocketChannel uses non-blocking IO underneath, but it blocks the thread while waiting for IO.

But generally speaking: no, runInterruptible() isn’t a bulletproof solution to running synchronous IO.

Some time ago I explored the topic of how to properly handle synchronous IO with coroutines, especially regarding cancellations. I think this topic isn’t very well documented in official docs (?). I found 3 possible solutions:

  1. runInterruptible() if we know IO is interruptible.
  2. Create an utility that automatically closes a resource on cancellation. Something like: Cancel blocking function after timeout without 'select' - #2 by broot . This seems like the best solution in most cases, but it requires that we know a resource that is related to the IO operation.
  3. Run IO inside a child coroutine and make the parent not wait for it on cancellation. I didn’t find a way to do it properly, but I was able to achieve this with some hacking: Coroutine/Job that doesn't join its children when cancelled

The only way to interrupt the blocking code externally if it does not support this in the API is to set interrupted flag in the thread (like this). Then the code could tread this flag calling isInterrupted or not. From the coroutine in order to use it, you need to run the coroutine on the separate thread and then call interrupt on Job cancelation. The separate thread is required to ensure you do not spoil threads from default thread pool.

Actually, I just checked and runInterrputably is doing just that. So my previous comment was wrong. But still the blocking code must treat thread interruption correctly.

1 Like