How do functions indicate their suspension points to coroutines?

I am learning about coroutines and reading the Coroutines and channels − tutorial. I have also read this article which explains a common misconception.

From what I understand so far, coroutines do not inherently turn blocking code into non-blocking/asynchronous code.

I am interested in this part of the tutorial where we introduce suspending functions and concurrency.

What I don’t understand is how the suspending function provided by the retrofit interface knows at what point to suspend and yield control of the thread to some other coroutine. For example, what will be written in the implementation of this function which indicates to the coroutine scope which calls this suspending function to pause and resume at the correct time.

interface GitHubService {
    @GET("orgs/{org}/repos?per_page=100")
    suspend fun getOrgRepos(
        @Path("org") org: String
    ): List<Repo>
}

How do I write my own non-blocking code that others can call using suspending functions inside a coroutine and get the non-blocking behavior. Something like

suspend fun getOrgRepos(..) {
// make network request and somehow suspend/yield control 
// waiting for result
// let the caller know I am ready to be resumed
}

Follow up:
From Roman’s article:

Suspending functions add a new dimension to code design. It was blocking/non-blocking without coroutines and now there is also suspending/non-suspending on top of that

Does this mean that blocking/non-blocking and suspending/non-suspending are completely orthogonal to teach other. I cannot resolve this mentally.

Is it correct to say: A piece of code will not block the calling thread if:

  1. It is called using a coroutine builder like async, launch, ~~runBlocking(probably not)~~
  2. The function called inside the builder has the suspend modifier
  3. The implementation/body of the function has followed some conventions. It is not actually blocking the thread and has also somehow communicated to the coroutine context details about when to suspend and resume.
1 Like

See the coroutines KEEP, which goes in detail through their design.

If you just want to know how something like retrofit might do this, then it’s simple. It has to rely on a callback-based API that already exists. In other words, it needs some way to “send off a network request” and provide a callback that is invoked when the network request is done. You might be familiar with such callback-based frameworks from Android or JS Promises, or from another language. Then, this callback-based framework is used like so:

suspend fun doNetworkRequest(params: NetworkRequestParams): NetworkResponse = suspendCoroutine { continuation ->
  CallbackNetworkLibrary.makeNetworkRequest(params, callback = { response -> continuation.resume(response) },
    onError = { error -> continuation.resumeWithException(error) })
}

suspendCoroutine is simply a suspend fun in the standard library that provides you with a Continuation<T> object. Such an object expects a value of type T, and continues the “rest of the code” at the point of capture of the continuation. In the code above, this means that execution will suspend when doNetworkRequest is called (impl detail: this happens by returning a value COROUTINE_SUSPENDED up the call stack), and then when the request finishes with an error or a response, the Continuation.resumeX method will be called, which will resume the rest of the code (the Continuation object has all the information needed to continue your code execution. It’s basically a stack of all the values you need, but it’s stored as a real object that you can move around). How does it know what thread to resume on? Simple, there’s a ContinuationInterceptor in the continuation’s coroutineContext, which intercepts the resume call, and adds a runnable task to the task queue of the thread that your code is meant to be called on (this might e.g. be the main thread on Android, or maybe some thread pool).

2 Likes

Extreeeeeeeeeeeeeeeeeemely simple explanation: coroutines are basically a giant while loop that runs one at a time.

So let’s say you write some code like this:

runBlocking {
    (0..10).forEach {
        launch {
            delay(500)
            println(it)
        }
    }
}

Basically what happens is the code gets to the forEach, then gets to the launch which creates a coroutine, then it *starts running the code inside the launch. Inside the launch, it hits the delay. At that point, the launch function actually returns and stops executing. So the forEach continues, and calls launch again, creating a second coroutine. This second coroutine runs, but it hits the delay, so the launch returns. It then calls the first launch, passing in the existing state of that function, IE what line it was up to, so that it can run again. The first launch, since it suspended because of a delay, checks if it’s waited long enough. No it hasn’t, so it returns. Then the forEach continues, and creates a third coroutine with another launch. This loop continues until all launches have completed.

Does that kind of make sense? Basically, if you have a single thread, you only have a single coroutine running at one time. When a coroutine suspends, it basically returns out of that function, and it returns the current state of that function. The event loop then calls the next coroutine to run and see how far it can get to.

*Technically speaking I think it creates all the coroutines and then runs the first one, but that’s irrelevant for the purposes of my super simplified explanation.

Now, the important point here is that a coroutine will ONLY suspend and return when it hits a suspension point. This means it doesn’t work with existing blocking code, because blocking code will call a function and wait for that function to complete. So how do we use blocking code with coroutines? Using the amazingly helpful withContext function.

withContext allows us to run code on a different coroutine dispatcher. A coroutine dispatcher is similar to a thread pool. So the easiest way to use blocking code inside coroutines is to use withContext(Dispatchers.IO) { // blocking code goes here }. What this will do is it will run your blocking code on the Dispatchers.IO coroutine dispatcher, which is a special thread pool for handling blocking I/O code. withContext will SUSPEND while it waits for the code on the different dispatcher to finish.

I hope this kind of makes sense. Basically, if you have a callback mechanism, you can do what @kyay10 said. If you don’t, and you have a blocking function, use withContext to run it on a thread pool. Follow these two basic rules, and you should be good to go. :slight_smile:

Do some testing with running code that has Thread.sleep() thrown in, and check that your code actually runs in parallel.

2 Likes

Thank you. This is what I was looking for. I will go through the KEEP.

It has to rely on a callback-based API that already exists

From your example, I understand it a little bit more, but how would I do it without a callback API. What if I only had a blocking network API to work with.

Will the following achieve the same result as the callback api version ?

suspend fun myNetworkRequest(params:.., cb: Callback):Response = suspendCoroutine { cont-> 
 thread { // I assume callback based APIs do need to create a new thread to avoid blocking
    val (result,err) = simpleBlockingNetworkRequest(params)
    if(result) { cont.resume(cb(result)) }
    else cont.resumeWithException(err)
 
   }
}

I am failing to appreciate exactly what coroutines make “easy”. Don’t I have to know all these underlying details to truly understand what’s happening when I write my application code ?

1 Like

This would work, but it’s bad practice to create your own thread like that. Instead, you want to use a pre-existing thread pool. Also, you usually don’t provide a callback to a suspend API, since the whole point is that suspend handles callbacks on its own and basically provides you with a direct-style API (i.e. it looks blocking, but it isn’t).
Instead what I’d do is:

suspend fun myNetworkRequest(params:..):Response = withContext(Dispatchers.IO) {
  val (result, err) = simpleBlockingNetworkRequest(params)
  if (result) result
  else throw err
}

This will block an IO thread while it waits for the call, but this is fine since that’s exactly what that thread pool is made for (I believe it has some magic optimization or something lol). Crucially, this will unblock the thread that the coroutine was initially running on, and the corountine will be resumed at the end of the withContext call (think of it as doing exactly what your code is doing, but using a pre-existing thread from a thread pool instead)

Yes I had understood that part but couldn’t understand how a function like delay would be written. After reading @kyay10’s answer, I understand that the magic happens in suspendCancellableCoroutine.

public suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return // don't delay
    return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
        // if timeMillis == Long.MAX_VALUE then just wait forever like awaitCancellation, don't schedule.
        if (timeMillis < Long.MAX_VALUE) {
            cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
        }
    }
}

Yeah fair enough. This is in line with my understanding.

This will block an IO thread while it waits for the call, but this is fine since that’s exactly what that thread pool is made for (I believe it has some magic optimization or something lol). Crucially, this will unblock the thread that the coroutine was initially running on, and the corountine will be resumed at the end of the withContext call (think of it as doing exactly what your code is doing, but using a pre-existing thread from a thread pool instead)

The only final nitpick/clarification I want is: The thread calling the coroutine will be unblocked in both scenarios right ? (The one where I manually create a new thread and also the new version which uses the dispatcher.)

Do you have any wisdom on this bit ?

I am failing to appreciate exactly what coroutines make “easy”. Don’t I have to know all these underlying details to truly understand what’s happening when I write my application code ?

I get the impression that the main point of coroutines is not to produce asynchronous code but rather make it easy to glue together exising asynchronous code and reuse a single thread among multiple co-routines.

I think it depends on the exact kind of coroutine you’re dealing with. yield() would be different to delay(), which would be different to withContext. idk if you really need to understand exactly how they work under the hood in order to use them. If you just wanna know cause you’re curious, that’s a different story. :slight_smile:

Regarding your question on what the main point of coroutines is… idk what the Kotlin devs would say, but imo coroutines are really good for two reasons. The first one is being lightweight, compared to having multiple threads, and the second one is the API is REALLY nice to use. (Apart from the awkward detail of having to mark all your functions with the suspend keyword which sucks for third-party integration).

A quick example for how you can use coroutines to make things easy.

class JdbiAccountDAO {
    suspend fun findAccount(accountId: String): Account? {
        return withContext(Dispatchers.IO) {
            // blocking code to find the account
        }
   }
}

class JdbiPostsDAO {
    suspend fun findPosts(accountId: String): List<Post> {
        return withContext(Dispatchers.IO) {
            // blocking code to find the posts
        }
    }
}

class UserService @Inject constructor(
    private val accountDAO: JdbiAccountDAO,
    private val postsDAO: JdbiPostsDAO
) {
    suspend fun getProfile(accountId: String): Pair<Account, List<Post>> = coroutineScope {
        val account = async {
            accountDAO.findAccount(accountId)
                ?: throw IllegalArgumentException("No account found for id $accountId")
        }

        val posts = async { postsDAO.findPosts(accountId) }

        account.await() to posts.await()
    }
}

Imo, that’s very to understand and use. Another really good advantage of coroutines is that you can put all the blocking code on one thread pool, and then EVERYTHING inside your app just runs on a single thread using coroutines. You don’t have to have multiple thread pools to try and make things concurrent, everything Just Works.

1 Like

Coroutines don’t magically make the blocking code non-blocking, they don’t provide new APIs for I/O or something. Let’s get an example of a web service that loads some data from a DB:

  • thread-per-request pattern - very easy to write a handler as the code is sequential, it executes top-to-bottom. However, performance is far from ideal as we have to spawn a lot of threads.
  • asynchronous - good for the performance, because we run a minimal number of threads and switch between them. Unfortunately, code is ugly, hard to write and maintain, because we have to use callbacks, futures, etc.
  • coroutine-per-request pattern - code is sequential, easy to write and very similar to when using multiple threads. At the same time it provides good performance, as internally it is asynchronous and works the same as in previous example.

Also, I/O is only one use case. Suspending happens when synchronizing multiple coroutines. Imagine we have a collection of 1000 items and we need to process them concurrently in Java. We create an explicit queue and thread pool, we submit tasks and observe for results. In Kotlin we simply spawn 1000 coroutines - queue and thread pool is created implicitly. Or maybe we need to often switch the execution between multiple thread pools? Again, with coroutines we can do this using sequential code. One good example of this are GUIs where we usually have a main thread. We can alternate between the main and background threads, while keeping the code as simple as: do this; do that.

Correct. In both cases the caller of myNetworkRequest is suspended and its thread can do something else.

1 Like

One thing to keep in mind is that 99% of the code that actually suspends a coroutine is either in the Kotlin library (like delay()) or a 3rd-party library (like Retrofit). If you are writing an app, you will hardly ever use suspendCancellableCoroutine().

Another thing is the “main-safe” paradigm, which is that all suspend functions should be written so they’re safe to be run from the main thread. This mainly (heh) means that if a suspend function wants to do something on a background thread then it should use withContext() to switch to a different dispatcher.

Coroutines are really nice for writing code that looks synchronous, and is therefore easy to read, but that does asynchronous stuff.

launch {
    val result = getDataFromNet()
    val processedResult = doExpensiveProcessing(result)
    updateUI(processedResult)
}

This coroutine does things on three different threads, but you don’t have to worry about that here.

1 Like

One thing to keep in mind is that 99% of the code that actually suspends a coroutine is either in the Kotlin library (like delay() ) or a 3rd-party library (like Retrofit). If you are writing an app, you will hardly ever use suspendCancellableCoroutine() .

The problem is that I am using hibernate to query my db and it does not offer any non-blocking APIs. So I was hoping to put this db interaction into a thread on the IO dispatcher and test it out. I now understand I can use the context switch to do so.

1 Like

I still don’t quite understand this behaviour. Nowhere did I do a withContext or explicitly create a new thread but still my main function prints "not blocked" and then waits for the long running task. This is completely at odds with what I have heard so far…

Expected outcome:
"not blocked" does not get printed before the long running task is finished.

Actual outcome:
"not blocked" gets printed implying that execution doesn’t stop at the async line even though I did not perform a context switch, make a new thread, etc

// IntelliJ warns about redundant suspend fn and also asks to replace sleep with delay.
suspend fun printTask() {
    Thread.sleep(2000)
    println("Finished long running task")
}

fun main() {
    // main thread
    runBlocking {
        // still the main thread
       val x = async {  printTask() }
        // why not blocked ?
        println("not blocked")
    }
}

So this is an oddity of coroutines… I think the way they work is that they are actually cold by default. So if you create a bunch of coroutines, none of them start running until a) you finish creating all your coroutines, or b) you wait for the result of a coroutine. One you do that, it starts running the first coroutine, and as soon as it suspends, it’ll switch to the next coroutine, and so on and so on.

1 Like

I think @Skater901 added a little to the confusion earlier by suggesting the contents of async is executed directly by the caller of the function. Usually, this is not the case. async/launch is just adding the task to the queue, it is like submitting to a Java executor. In most cases, it only schedules a task and the code continues straight away.

Coroutines don’t guarantee any specific execution order in this case. We should assume the code inside the async and below run concurrently. In practice, in most cases the code below will be executed first. But we can partially control this behavior (e.g.: Dispatchers.Unconfined, Dispatchers.Main.immediate).

Please see modified example where it is clear the code below async executed first and it blocked the code inside async from starting for 2s:

suspend fun printTask(id: String) {
    Thread.sleep(2000)
    println("Finished long running task: $id")
}

fun main() {
    // main thread
    runBlocking {
        // still the main thread
        val x = async {  printTask("1") }
        printTask("2")
    }
}
1 Like

HOW DARE YOU I would never explain things in a confusing way. :face_with_tongue: My explanations are always perfect, obviously!!

3 Likes