What resumes the coroutine when it is suspended

I’m trying to understand how Kotlin coroutine works.

First, let’s start with an example in Java.
Let’s say we have a thread A, at some point, this thread needs to do an I/O operation (such as reading the file or reading data via network request). At this point, I understand that the CPU becomes free for other threads (or processes). How it works: CPU triggers the I/O operation, let the DMA controller and the hard disk (or network card) do their job, CPU becomes free to do other things for other threads. During this operation, thread A needs to wait. After that, when the I/O operation is done, DMA controller sends an interrupt to the CPU to notify that the I/O operation is done. With this notification, the CPU knows that thread A can be resumed.

Now let’s get back to our Kotlin Coroutine.
I understand how the suspend function works, I read about it state machine under the hood. The suspend function just returns at the suspension point. The continuation will be passed to something else which will resume that continuation later, that continuation contains the state so that the coroutine can continue at the right place that it left before. The question: what is something else that triggers the coroutine resume? There must be a thread doing that, right? What is that thread? Is there a dedicated thread which observes all the suspended coroutines, continually check which coroutines can be resumed and trigger the coroutine resume.
For example, the similar situation where we have a coroutine that needs to do an I/O operation. It runs on a thread, when the I/O operation starts, the coroutine will have to wait and be suspended. That thread will execute another coroutine. When the interrupt comes, I think there must be a thread takes the CPU to resume the coroutine. What is that thread?
For the delay function, it schedule a timer to resume the suspended coroutine. That timer will run on a dedicated thread. That thread is shared by all coroutines that call delay. For I/O operation, I guess there is another thread with the same responsibility, isn’t it?

1 Like

As you pretty much said yourself, that highly depends on the case or on the reason to suspend.

  • Joining another coroutine or using synchronization utils - one coroutine resumes another.
  • Bridging coroutines with APIs based on futures, callbacks or similar - callbacks resume.
  • Asynchronous or non-blocking I/O - there is usually some kind of I/O thread managed by the I/O framework which is used to notify about events.
  • Blocking I/O using Dispatchers.IO - it uses a shared thread pool to schedule the operation (it blocks one of its threads) and when the thread is awakened by the OS, it resumes execution in the original thread (actually, dispatcher).
  • Delay - this is more complicated. I believe we verify if the current dispatcher is backed by ScheduledExecutorService and in that case we use its functionality. Otherwise, I believe there is a shared global executor which is based on custom implementation of an event loop and it supports delaying, but I’m not sure how does it work. I guess when targeting JavaScript delay() is probably handled by setTimeout() or something similar.
2 Likes

It’s not exactly as you describe. Delay is a special function that checks whether the current dispatcher installed in the coroutine context supports delayed scheduling (implements Delay)…
But to get back to your question, the thread that resumes a coroutine is the one that is calling Continuation.resume.
When you want to suspend a coroutine, you use the suspendCoroutine primitive, which gives you the Continuation that you need to invoke to resume the coroutine.
Imagine the implementation of Mutex as the following:

  • coroutine A calls lock. Lock suspends the coroutine using suspendCoroutine, checks the state of the mutex, which is unlocked. Changes the state of the mutex to locked and resumes the coroutine A immediately by calling Continuation.resume because it acquired the lock.
  • coroutine B calls lock. Lock suspends coroutine using suspendCoroutine, checks the state of the mutex, which is locked. Because it is locked, it doesn’t resume coroutine B, but it stores the Continuation internally instead.
  • coroutine A calls unlock. Unlock checks whether there is some other coroutine suspended waiting for the lock (checks if there are stored continuations), finds the Continuation of coroutine B, therefore it invokes that continuation, resuming coroutine B. Notice that this happens from the thread on which coroutine A is running, which could be whatever.
  • coroutine B calls unlock. Unlock checks whether there is some other coroutine suspended waiting for the lock. There are no other coroutines, therefore it changes the state from locked to unlocked.

It is not exactly like this, but more or less. Suspension is just capturing the continuation of a coroutine and resuming it when a certain condition is met (or after a certain time in the case of delay).

In the case of IO, at least in the standard case, the coroutine does not suspends, because IO is blocking. What you generally do is, you wrap the blocking IO in a withContext(Dispatchers.IO).
withContext suspends the current coroutine and run the inside block on a new coroutine that runs on the given dispatcher (in this case IO, which uses a large thread pool). The new coroutine blocks the underlying IO thread until completion, and then resumes the original coroutine (by calling continuation.resume). In this way the original coroutine is not blocked, but suspended.

1 Like

Just want to share my opinion, hope can give you some input:

Delay is just a suspend function. IMO it is just the same as postDelay by enqueue block that need to run at a specific time (inside this block it will trigger callback continuation to resume)

Regarding your question: Something else = Continuation interface = CPS

Suspend function is just a normal function with the capability of paused/resume at suspend point. The compiler will do the trick by transforming the original function by:

  • Add one Continuation param for hold state to the last input param of the original func. The purpose of this continuation here is resume → call the original function with a different state to resume exactly where it left (sequence) → to me it looks like splitting the function to multiple message/callstack then enqueue it again to queue and ready to pick thread pool.

So IMO what happened at the resumed point → just release the call stack (by return) to let other messages in the queue have a chance to run (not block by deep callstack as before).

To really understand what happened nothing better than taking a looks into the coroutines source code itself but it should be a challenging task, I’m looking for high-level design document but it look like it not exist yet. The best book so far for this topic is Kotlin Coroutines Deep Dive by Marcin Moskała

1 Like