I’ve written this code for JVM that will suspend a coroutine when it calls code that blocks on IO from a suspend function but without blocking the suspend function’s host thread.
My goals:
meant to be called from a suspend function when it needs to call code that blocks on IO
do not block the current coroutine’s host thread
the coroutine should suspend when this is called
the action passed in (the blocking code) should be run on Dispatchers.IO.
the function should not return until the action is done or the coroutine is cancelled.
/**
* Utility for moving from `suspend` code to non-`suspend` code ([action]) that blocks on IO.
* Call this from a `suspend` function that needs to execute code that blocks on IO.
* This will ensure that your suspend function doesn't block its host thread but suspends
* instead.
*
* Typical usage:
* ```
* suspend fun mySuspendFun() {
* suspendIO { legacyNonSuspendingService.doIO() }
* }
* ```
*
* @param action non-suspending code that blocks on IO
*/
@Suppress("unchecked_cast")
suspend inline fun <T> suspendIO(
crossinline action: () -> T,
): T =
suspendCancellableCoroutine { continuation ->
CompletableFuture
// NOTE Dispatchers.IO.asExecutor() is just a cast at run-time
.supplyAsync({ action() }, Dispatchers.IO.asExecutor())
.let { future: CompletableFuture<T> ->
continuation.invokeOnCancellation {
future.cancel(true)
}
future
.whenComplete { value: T?, throwable: Throwable? ->
if (throwable !== null) {
if (throwable is CompletionException || throwable is ExecutionException) {
continuation.resumeWithException(throwable.cause!!)
} else {
continuation.resumeWithException(throwable)
}
} else {
continuation.resume(value as T)
}
}
}
}
I’ve verified (with tests) that this code does meet my goals. I’m wondering if there’s a much better way of doing this.
Yes, this is basically why they invented the Dispatchers.IO in the first place - to allow calling a blocking code from a suspending context without blocking the calling coroutine. And yes, it propagates cancellations and failures according to general coroutine rules.
There may be minor differences in behavior between your code and the above, but they should be generally the same. Also, using Dispatchers.IO directly provides a nice optimization - it avoids switching between threads if possible. If we dispatch manually, as in your code, maybe it still provides this optimization, but I’m not sure.
Well, you’ll notice that my code is using the Dispatcher.IO, thread pool.
But I don’t believe the simple suggestion handles cancellation, thead switching, or coroutine suspension as well as what I have.
First, I’m not sure the simpler code guarantees that the coroutines will suspend during the execution of the IO code.
Second, if the incoming host thread is already a Dispatcher.IO thread (and remember that its thread pool shares threads with the Dispatchers.Default thread pool), then the simple code doesn’t even guarantee that the execution will switch to another thread.
If I’m right about either of those things, then I’m pretty sure cancellation isn’t going to work smoothly… and possibly worse things could happen.
But I may be wrong about both those things. The docs aren’t super-clear.
If withContext doesn’t suspend the calling thread, then this main function would take around 20 seconds to run. If it does suspend the calling thread, it should take around 4 seconds to run. Give it a try and see what happens.
I can guarantee that withContext works perfectly well. It’s the core of our asynchronous code at my job. We just wrap all of our HTTP/SQL/NoSQL calls inside withContext, throw it all on one big thread pool, then just do all our async work using async and launch and Flows, and it all just works perfectly well.
It guarantees we don’t block any thread that wasn’t meant to be blocked, e.g. the main thread. What’s your concern exactly? Any code example that would show the problem?
And this is a feature, not a bug. They implemented this optimization intentionally, so we avoid switching threads whenever possible, while still making guarantees about not blocking caller threads.
I see this the other way around. withContext(Dispatchers.IO) is the standard way how we handle blocking code from Kotlin coroutines. It was provided by the Kotlin authors and it was proven by years of using by many people. We could try implementing alternative approaches if we have specific needs, but then: dragons ahead. It is easy to miss some corner cases. Most importantly, your implementation doesn’t create a parent-child relationship between both sides, it opt-outs of the standard structured concurrency model. It could be good, it could be bad.
Regarding cancellations: cancelling a blocking I/O is generally tricky. We can cancel the coroutine, we can future.cancel(true), we can interrupt the thread, and the operation will ignore this. It is blocked after all. Your implementation may differ from the simple one in a way that in the case of a cancellation it “orphans” the blocking operation and releases the caller coroutine straight away (but I’m not sure it does this). While the simple implementation will still have to wait for the blocking I/O. This is due to above: standard approach uses the structured concurrency, it guarantees not leaking any background tasks.
Well, saying that your code is not following the structured concurrency, is not entirely correct. It is not using the standard builtin model, but most of your code is actually about parent-child responsibilities.
My point is: I believe we can’t easily cancel/interrupt some/most of blocking I/O operations. They are blocked and unresponsive. We can choose between suspending the caller until the I/O fails, which makes cancellations ineffective (withContext(Dispatchers.IO) does this). Or we can leave the blocked background operation as it is, orphan it, and progress with the application - this is technically leaking background tasks and is a little bit against the structured concurrency concept. I sometimes decided to do this and potentially your code is doing this, but I’m not sure without checking. That potentially differentiates it from the simpler solution.
The argument that my complex code may not be a net win for the cost may be a valid one.
The simpler code may spare me from switching threads. But, in the situations when I’m using this function (when I know that the code is about to block on IO) I think I explicitly do not want that optimization.
I may be overcompensating due to having seen subtle, difficult-to-find threading issues come up in code that “been running fine in production for years.”
Again, I’m open to my solution being overkill and really appreciate you all engaging with me on this.
But let’s say, for the sake of discussion, that I very explicitly want (1) to execute the IO-blocking code on another thread and (2) to guarantee that coroutine suspends.
If I understand how the suspendCancellableCoroutine and JVM’s supplyAsync functions work, then I’ve accomplished both of those with my code.
Also, I’m pretty sure that my code is in compliance with all the guidance about structured concurrency. My code should be properly using the continuation inside the suspendCancallableCoroutine block.
My thought is that since I know that the code I’m passing in as the action blocks:
I don’t want the thread-sharing optimizations provided by the simple solution and
my handling of cancellations may have benefits over what happens in the simpler code since I know for certain that the the IO-blocking code is not actually blocking a coroutine or any thread that’s might be needing to do anything else but IO-blocking and therefore, it can be interrupted externally and might benefit from that early interruption (in terms of quickly releasing resources).
I don’t follow this. Every time we do withContext(Dispatchers.IO) we assume we’ll do a blocking operation, and still we want the optimization.
If we need to control explicitly which thread is doing the I/O, the usual way with coroutines is to create a new dispatcher:
val ioDispatcher = Executors.newFixedThreadPool(64).asCoroutineDispatcher()
...
withContext(ioDispatcher) { ... }
Both solutions work similarly for these characteristics. They both suspend the caller coroutine, they block the thread in the I/O pool, they both propagate cancellations, and both effectively fail to do that (as blocking I/O is not easily cancellable). Again, there is a difference where your code will orphan the work still happening in the background, while withContext will wait for it to fail / finish cancelling. This could be desirable or not, depending on the case.
I don’t understand what you’re trying to achieve, then. If you’re not using launch or async, then you’re effectively running single-threaded code, and it doesn’t matter if the current thread gets blocked. If you’re only ever running one coroutine at a time on a thread, then it doesn’t matter if the coroutine blocks rather than suspending.
If the optimization occurs in the simpler solution, then the caller thread is blocked on IO. Also, it isn’t clear that the coroutine would necessarily suspend in that case.
Maybe someone needs to convince me that those two things aren’t important. Right now, I still think they may be important.
Honestly, I can’t make a great argument or come up with a simple example showing that they are important.
However, I’ve heard some Kotlin coroutine experts say that it is not a good idea to block the caller thread of a suspend function. I figure they’ve thought it through more deeply than I could.
So, if they’re wrong, here, I need that explained to me with a pretty high degree of subtlety.
My application has many thousands of coroutines. What I’m concerned about right now is what happens when one of them wants to run code that blocks on IO.
I want it not to block the calling thread and I want it to supsend.
My code guarantees that.
There’s another conversation happening about whether those two goals are important.
You’re test shows that coroutines run asynchronously. That’s cool, just not what this discussion was meant to be about.
Yes, they were right. And then not only coroutine experts, but coroutine authors themselves implemented an optimization and you don’t trust it
You are generally correct. This optimization means we may technically block the caller thread. And yes, blocking the thread inside a coroutine is generally bad. But this is not “the rule” itself, this is not the root problem. Problem happens because some threads are meant to be block-free. It could be the main thread of an UI application or they could be threads for CPU-intensive workloads (Dispatchers.Default).
Kotlin optimization is simple: whenever we switch the coroutine Default→IO, we don’t move the coroutine to another thread, but we make the thread itself an I/O thread, while another I/O thread becomes the “Default” thread to compensate. And the opposite: if we switch IO→Default and there is an unoccupied Default thread, this thread becomes I/O and our thread becomes Default. This way we avoid switching threads while still guaranteeing we keep a healthy fleet of Default threads.
Example by @Skater901 and mine (I believe they were effectively the same), showed that we can call multiple blocking operation and that doesn’t impact responsiveness or concurrency of coroutines - as long as we use withContext(Dispatchers.IO) .
Let’s say you have a coroutine, we’ll call it coroutine 1, running on thread A. This coroutine wants to run some blocking code. Calling withContext(Dispatchers.IO) creates a new coroutine (call it coroutine 2) and suspends coroutine 1. It is guaranteed that coroutine 1 will suspend until coroutine 2 completes, at which point coroutine 1 will resume.
As for threads:
If thread A is the main thread then coroutine 2 will be scheduled to run on a thread from the IO dispatcher’s thread pool (call it thread B), and thread A won’t be blocked.
If thread A is in the IO dispatcher’s thread pool, then thread A’s reason to exist is to run blocking code. So running coroutine 2 on thread B instead of A doesn’t provide any benefit – you still have a thread in the IO dispatcher being blocked. Running coroutine 2 on thread A is also faster, since it doesn’t have to switch threads first.
If thread A is in the default dispatcher’s thread pool, well, that’s the same thread pool as the IO dispatcher. Once again there’s no point to switch to thread 2, the dispatcher can just “move” thread A from the default dispatcher to the IO dispatcher and continue running on thread A.
So I’d argue that you don’t “explicitly want to execute the IO-blocking code on another thread”.