Coroutines That Call Functions With Blocking Code

I’m trying to confirm if my understanding of coroutines is correct.

Given that: methodThatContainsBlockingCode(i) calls some sort of blocking code, be it jdbc, REST call, etc…
Then 10 lightweight threads will be created and block execution till the last one completes

// methodThatContainsBlockingCode(i) is a 'mocked' function that calls Thread.sleep(3_000)
suspend fun process() : Unit {
    withContext(Dispatchers.IO) {
        for (i in 1 .. 10) {            
            async() { methodThatContainsBlockingCode(i) }            
        }
    }
}

When I see the code execute it takes like 3 to 4 seconds total for them to complete foo() to return the calling function. This is great, as without the coroutine it would take > 30 seconds.

Some reading/videos/medium posts have lead me to believe that I’m going to have to wait for the methods in the coroutines to finish before the method returns? Is this correct?

Is there a way to immediately return where the code will continue to execute? The reason I’m asking is because foo() is being called from a Spring RestController and I would really like to immediately return to the client so the client does not have to wait. I’m dealing with legacy code here and refactoring to a reactive approach is not feasible.

FYI, I did get this to work w/ a CompletableFuture but it seems the completeOnTimeOut method doesn’t execute :frowning:

    fun foo(request: TheRequest) {

        val ids = checkIds(request.ids)

        ids.forEach {
            this.log.info("Executing work with id: $it")
            CompletableFuture.supplyAsync{
                barService.execute(it)
            }.thenAccept{ barService.logWorkComplete(it) }
                //.completeOnTimeout(timeOutError(), 1, TimeUnit.HOURS)
        }
    }
1 Like

You don’t really use any features of coroutines here. As a matter of fact, you opted-out of their most important benefits. If we need to simply schedule background I/O operations and we don’t care when and if they complete, then the easiest way with coroutines is GlobalScope.launch(Dispatchers.IO) {}. But it is not much different than creating our own executor with: Executors.newFixedThreadPool() and submitting to it, or even doing simply: thread {}.

CompletableFuture.supplyAsync{} is actually not a good idea here, because it uses ForkJoinPool.commonPool() and we should not block threads in this thread pool.

1 Like

Thank you @broot . Please forgive my ignorance as I am new to kotlin and even newer to coroutines. What are the important features that I’m opting out of?

The code example in the first snippet is taken from a couple different tutorials.

I have seen the usage of GlobalScope.launch(Dispatchers.IO) {} but I’m concerned about using that in a HTTP request. Especially if a request to the endpoint is made when another request is running. Am I overthinking that?

We benefit from coroutines the most if we have multiple tasks that often have to wait for each other, so they should be synchronized. Or if we frequently switch between I/O, CPU-intensive processing and/or event loops. In your case you simply delegate an I/O operation to a background thread - coroutines framework internally will do pretty much the same we always did when using executors and thread pools.

Coroutines also provide structured concurrency which means we can easily wait for other tasks, we automatically propagate errors and cancellations, etc., but you said you actually like to avoid that.

What is your concern exactly?

1 Like

The use of a GlobalScope scope seems dangerous in an HTTP context. But then again, I’m new to Kotlin and coroutines so maybe I’m over thinking it?

Anyways, is my understanding correct that the blocking code called in a coroutine will block the coroutine from finishing and I will have to wait for the code to finish before any follow on code is executed?

Thank you for your time!

You didn’t really explain what do you mean. You only repeated what you already said - that it is somehow “dangerous”. How is it dangerous?

I think I don’t get your question. Yes, this is how programming languages work in general - we execute a line of code and when we finish, we execute the next one. Coroutines aren’t any different.

If you meant the code that invoked GlobalScope.launch(), the no, it doesn’t wait for the launched coroutine to finish.

I’ll chime in and see if I can give a real quick Coroutines 101 crash course.

The structured concurrency stuff that broot is talking about is (as far as I understand) a fancy, complicated way of saying that “by design, when you start a coroutine, the method that started it won’t finish until the coroutine does”. A quick example:

suspend fun myCoroutineMethod() {
    coroutineScope {
        launch { doAThing() } // Launch returns a Job we can wait for or check status or whatever
        doAnotherThing()
        doAThirdThing()
    }
}

In the above code, we don’t explicitly wait for the launch { doAThing() } to finish, but the way coroutines work, the coroutineScope lambda won’t finish until the launch has finished.

Using GlobalScope is discouraged, because it is going against that fundamental coroutine philosophy of “you wait for what you start”. If you’re using GlobalScope, you are effectively firing and forgetting, which means if the coroutine you launch fails, or runs forever… what happens? Do you deal with it? So ideally you should know what you’re doing before using GlobalScope.

I think another thing that broot is saying is that one of the strengths of coroutines is that, as long as you’re not using blocking code, you don’t have to worry about thread management or threads at all, really. You can just start firing off coroutines in your method, write other code, and it all just works. But in this case, the only thing you’re trying to do is launch a task in the background, so you don’t really need to use coroutines or get any benefit out of it. Using something like a CompletableFuture, or even submitting the work to an ExecutorService might be a better choice.

If you give us more context over what you’re trying to do, we can probably provide better guidance. :slight_smile:

Nitpick: we do wait for launch (docs) to finish. :slight_smile: But it finishes rather quickly, as it launches doAThing() in the background and then returns without caring for the result of doAThing() - like when using Promise / Future (although async (docs) would be the better analogy here).

We do not wait for doAThing() to finish, before executing doAnotherThing() and then doAThirdThing(), but as already said, coroutineScope (docs) will wait for every child to finish, so it and in turn myCoroutineMethod will return only when doAThirdThing() and doAThing() have returned (or thrown), in whatever order.


About GlobalScope (docs): it’s marked as @DelicateCoroutinesApi, so not really deprecated, but also maybe not really suitable for common use. The caveat mentioned in the docs is that GlobalScope is (as the name implies) valid and running throughout the lifetime of your JVM application. So it could be that an operation, e.g. network IO, is blocked forever / very long, which won’t get cleaned up by structured concurrency (what is normally used when programming with Kotlin coroutines).

If you have a matching lifecycle where you can start and end a coroutine scope (which is the thing where a group of coroutines is managed in, so to say), the better aproach would be to create your own scope and end it appropriately. You can create one at the start of your lifecycle with CoroutineScope() (docs) and then at the end of your lifecycle cancel() (docs) it.


Regarding blocking[1] stuff: the thread on which a coroutine (your lines of code) is actually executed will be determined by a CoroutineDispatcher (docs), which is part of a coroutine context and thus can be changed via e.g. withContext(Dispatchers.IO) { ... }.

There is Dispatchers.Default (docs), which is used by default (surprise!). It spawns a number of threads on which coroutines can run (between 2 and #cores[2]). The main thing to note here is: if you schedule a blocking operation on such a thread, the thread will be blocked, so you have one less slot for a coroutine that could run in parallel. It is important to schedule only such coroutines onto this dispatcher, which may suspend and do not block, or else Dispatchers.Default will be starved of slots to run coroutines in and it will halt.

Luckily there is Dispatchers.IO (docs) which is intended for blocking operations, such as (non-async) IO - hence the name. It spawns up to 64 or #cores threads on demand (whichever is higher, so probably 64). This is an extra pool of slots where coroutines can run in. Note that this one can also fill up, but if 64 IO operations are pending, then something severe might be wrong… (you should always set reasonable timeouts on blocking operations).

That’s why it’s important to use e.g. withContext(Dispatchers.IO) { ... } to separate the blocking things (like calling a coroutine-unaware library which does something blocking under the hood, be it in Java or Kotlin) from the ‘normal’ coroutine things.


I hope this clears things up a bit and removes some concerns. For more information, I suggest reading through the majority of the official Coroutines Guide (up until Select expression).


  1. blocking here means that a whole thread cannot progress further ↩︎

  2. number of CPU cores ↩︎

I had to read your nitpick a few times to figure out what you were even nitpicking. I think what I wrote is clear, but now that I understand where you’re coming from, I’ll re-word what I said to make it clearer.

When we use launch { ... }, it returns a Job, similar to how if we use an ExecutorService and call submit, it returns a Future. The difference is that with a coroutine Job, even if we don’t call join(), the surrounding CoroutineScope will wait for the Job to finish, because the Job is effectively attached to that CoroutineScope. Where as with a Future, if you don’t call get(), your code that submitted the work to the ExecutorService will finish and return, and the Future will be left running with nobody caring whether it ever completes, or if it completes successfully or with an error. You can fire off work with coroutines to run in the background, but you opt-in to that, rather than with Futures where that’s the default behaviour, and you have to remember to opt-in to waiting for the work to complete.

2 Likes

@Skater901 @frozenice @broot
Please allow me to start off by apologizing for taking so long to get back to this thread and thank you all for your input!

Now, I will start by trying to describe the task that, for the moment looks like I have completed. The code is currently “behaving” :slight_smile:

I’m working on a project that is syncing up data between different domains. Let’s refer to them as domains A, B, and C. The code to sync domain A and B is already in place (this is where the blocking code exists, which amounts to a call to domain A, a call to the db of domain B, and then sending messages to a kafka topic so domain C can do their work)

Here’s where I come in, I have a rest api endpoint that provides 0 to N id’s where each id should exist in each domain. Each id represents a plan which will have multiple features, and millions of people could be subscribed to a plan.

Since we’re potentially dealing with millions of records here, and we don’t want a user to have to wait for a response so we opted to go with coroutines.

Here’s what I finally got to work/behave. The API endpoint returns in milliseconds and we observe the data being synced in domain B to match domain A

@Service
data class PlanSyncService(var subscriptionSyncService: SubscriptionSyncService,
                           var planRepository: PlanRepository
    ) : CoroutineScope by CoroutineScope(Dispatchers.IO) {

    private val log = LoggerFactory.getLogger(javaClass)
    fun syncPlans(request: PlanSyncRequest) {
        val actualPlanIds = checkPlanIds(request.planIds)
        actualPlanIds.map { planId -> async {
               subscriptionSyncService.planSync(planId)
            }
        }
    }

    fun checkPlanIds(planIds: Set<UUID>?) : Set<UUID> {
        return if(planIds.isNullOrEmpty()) {
            planRepository.allPlanIds
        } else {
            planIds
        }
    }
}

We haven’t experienced any crashes in our non prod environments. But we’re curious how this is going to scale in prod.

Anyways, thank you again!

Again, you don’t really use any advantages specific to coroutines here. You could simply create a thread pool and submit to it. But well, coroutines is yet another way to do the same, so why not.

Also, this code is not really a rocket science - you simply schedule fire-and-forget tasks in the background. Not much to discuss here.

I think you should consider cases like:

  • High rate of requests for multiple IDs, so you queue more and more and more background tasks. Initially, the delay for syncing will increase to minutes, then hours, at some point the service will crash due to OOM.
  • Failures when sending requests to subscriptionSyncService - nobody knows about it, the service “thinks” it synced the data, but it didn’t.
  • Duplicated sync operations for the same data.
  • Overloading the subscriptionSyncService - currently it will run ~64 concurrent requests from a single instance of your service. It may be too much, it may not make sense, but maybe it is fine.
  • Overloading Dispatchers.IO - that means any other part of your application using Dispatchers.IO will have problems doing anything. I suggest using a separate thread pool for such “aggressive” scheduling.

Also… a data class, which is a service and a coroutine scope at the same time… Data class should be a relatively simple data holder and should (almost?) never be a service. Mixing a service and a coroutine scope also feels strange, but maybe it makes some sense.

1 Like

Actually, it’s even worse. After even a single failed operation, your whole service becomes entirely broken, it doesn’t do anything anymore while it still returns 200s saying it syncs the data.

@broot
Thank you for your input.

  1. “The code isn’t rocket science.” This is correct.
  2. “High rate of requests for multiple IDs” fair point but not of concern since this isn’t a public facing api. It’s internal, for company use only.
  3. " Failures when sending requests to subscriptionSyncService". The subscription service is logging errors.
  4. Overloading Dispatchers.IO. This is a very small microservice and the code I am writing is the only code using any part of the coroutine library. We’ll cross this road in the future, if we even need to. As our data becomes more correct, the amount of work needed will decrease.
  5. The endpoint is returning a 202, not a 200.

I left them details out because I felt they weren’t pertinent to the discussion.

Yeah I basically agree with everything broot said. This is pretty standard “launch a job in the background” stuff that doesn’t really need coroutines. I have some thoughts on the overall design of your code/solution, but I’ll keep my feedback limited to the coroutine-related aspects.

Firstly, I would say you almost never want a class to implement CoroutineScope unless that class is some kind of custom coroutine dispatching thingo. What you’re doing here is equivalent to making a service class that implements the ExecutorService interface. It would make more sense and be more readable to add a constructor parameter to PlanSyncService of type CoroutineScope, and then use that to launch your tasks. A quick rewrite would look something like this:

@Service
data class PlanSyncService(
    var subscriptionSyncService: SubscriptionSyncService,
    var planRepository: PlanRepository,
    val scope: CoroutineScope = Dispatchers.IO // I think you can do this
) {

    private val log = LoggerFactory.getLogger(javaClass)
    fun syncPlans(request: PlanSyncRequest) {
        val actualPlanIds = checkPlanIds(request.planIds)
        actualPlanIds.map { planId -> scope.async {
               subscriptionSyncService.planSync(planId)
            }
        }
    }

    fun checkPlanIds(planIds: Set<UUID>?) : Set<UUID> {
        return if(planIds.isNullOrEmpty()) {
            planRepository.allPlanIds
        } else {
            planIds
        }
    }
}

Imo, this is much clearer and obvious, and looks familiar to Java coders who are used to using executor services to submit work, and the like.

Secondly, strictly speaking you should be using launch not async; async is for when you want to wait for a value to be returned, launch is for something that either returns Unit, or you don’t care about the result. Your code will do what you want it to do, but from the perspective of “using the right tool for the right job”, you should be using launch. It’s also easier for someone reading your code to figure out that it’s a “fire-and-forget” task when using launch.

Thirdly, is subscriptionSyncService.planSync() actually a suspending function? If yes, does it have any suspending code inside it? Because if not, then you are literally not using coroutines, but rather just using a thread pool to run blocking code, and you might as well use an ExecutorService. To quote Rick and Morty: this just sounds like thread pools with extra steps.

@Skater901

    // Instead of a constructor argument, how about this?
    private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)

    fun syncPlans(request: PlanSyncRequest) {
        val actualPlanIds = checkPlanIds(request.planIds)
        actualPlanIds.map { planId -> scope.launch {
                    subscriptionSstSyncService.planSync(planId)
            }
        }
    }

As far as the subscriptionSyncService.planSync(planId) method, it’s a method that I didn’t write and what I am currently working with.

It’s roughly something like this and what is there is more pseudocode to show that there is no suspending code, but there is multiple instances of blocking code.

fun planSync(planId: UUID): Plan? {

val domainAPlan = getDomainAPlanGivenId (planId) // in this method is a blocking call to another microservice

val domainBPlan = getDomainBPlanGivenId(planId) // this method contains code to query data (blocking) that is owned by this microservice

// some business logic to compare the two plans, and create an object for updating domainB
val updatedPlan = repository.updatePlanForDomainB(objectMentionedAbove) // another blocking call

// some more code to create a message to send to kafka 
producer.sendMessage(message)

return updatedPlan 
}

Now if I may, I have a question. Several times in this thread I’ve been told that I’m not really using coroutines. Given that and the following snippet of an explanation of coroutines: “Coroutines are an alternate way of writing asynchronous programs but are much more lightweight compared to threads. They are computations that run on top of threads.”. Am I not even getting the “lightweight” feature with my implementation?

Say three id’s are passed to my service, am I not creating (maybe spawning is a more correct term) 3 coroutines?

Thank you.

We mean that simply launching background tasks doesn’t really require or benefit from coroutines. Your code is pretty much the same as:

val executor = Executors.newFixedThreadPool(64)
executor.submit { ... }
executor.submit { ... }
executor.submit { ... }

We were doing this since Java 1.5 in 2004 :wink:

Coroutines were invented mostly to be able to wait for something without blocking the thread (but you don’t want to wait and also you use mostly blocking code) and for structured concurrency (which is the opposite of fire-and-forget approach). Of course, we can use coroutines just because we like its API for launching background tasks or to make our code multiplatform. But generally speaking, what you do has little in common with coroutines.

The way I understand coroutines, and I might be wrong on this, is that ultimately coroutines are just an event loop. If you have 1,000 coroutines running on 1 thread, you actually only ever have 1 “active” coroutine, and the other 999 are all suspended, waiting their turn.

A CoroutineDispatcher is a thread pool, and it runs coroutines on its threads. So regarding the “lightweight compared to threads” thing, that’s true if you’re running a lot of coroutines on a few threads.

In this case, you’re running blocking code on Dispatchers.IO. From memory, it has a number of threads equal to your CPU cores. So if you launch 1,000 coroutines with blocking code and tell Dispatchers.IO to run them, the end result is effectively the same as if you submitted 1,000 Runnables to a thread pool in Java. In fact it might be slightly worse, since in Kotlin, each coroutine uses a bit of memory to keep track of its state, where as with a thread pool, I think each lambda would just get added to a queue, so there’s no extra state being tracked.

Does that help to explain it? Basically, coroutines are lightweight if you run lots of them on a small number of threads. But if you end up with n coroutines running on n threads, you’re not really getting any benefit.

You have no control over when a non suspending function returns. It could be calculating pi for 30 seconds or making rest call. Doesn’t matter. If you are upset that a rest call blocks for seconds then use a rest call library that uses suspending functions.