In a static API, what would be the best way to launch coroutines?

If I have a backend API that is completely Singletons, how could I apply the correct scope to start Coroutines?

Before I was using RxJava and I just didn’t have to control this as my backend is completely static and I have no idea where it would be correct to launch the coroutines.

1 Like

You could just create a coroutine-scope for each singleton and launch the coroutines in that scope or just create one scope and launch everything in that.

As I see, scope is a management feature, if you do not start/stop components on the fly they are not really important.

Creating a scope is rather easy, but pay attention to the dispatcher you choose:

val scope = CoroutineScope(Dispatchers.Unconfined)

As you mention, the scope I need is not part of some class that needs to be cleaned, so I think the solution will be to create a scope for each Singleton.

Don’t forget about structured concurrency. Unless you want one failed Job to cancel everything else, this sounds like the one case where you want GlobalScope. You can do GlobalScope + someDispatcher to create a customized scope with no Job.

Also consider if you actually need a scope at all. Could you make your methods suspend? It’ll be much easier to deal with if you can just use the context of your caller.

Of course, all my methods are suspended. My only concern is that libraries like KMongo (MongoDB for Kotlin) or other suspended tasks could cause memory leaks if I run them in GlobalScope or CoroutineScope in Singletons.

If all your methods suspend, then why do you need a scope at all? If you want some local concurrency, then coroutineScope { … } is helpful. If you have internal continual background processing, what stops you from managing that internally in the singleton?

It may help if you provide a sample snippet of code to show your use case.

1 Like

I used to work with Reactor so I’m not sure how to use Coroutines in such a static backend like this and avoid memory leaks.

Example of my project

object MyRepositoryExample {
    suspend fun findByName(name: String)
    suspend fun save(user: Account)
    // ...
}

object MyRepositoryExample2 {
    suspend fun findByNumber(name: String)
    suspend fun getAll(user: Account)
    // ...
}

// First File
class ClassInit : BaseMode {

    lateinit var controllerDemo1 : ControllerDemo1
            private set

    lateinit var controllerDemo2 : ControllerDemo2
        private set

    override onStartup() {
        controllerDemo1 = ControllerDemo1(CallbackManagerRoot.get())
        controllerDemo2 = ControllerDemo2(CallbackManagerRoot.get())
    }
}

// File ControllerDemo1.kt
// Singleton Class
class ControllerDemo1(callbackRootManager: CallbackManagerRoot) // API bridge in C ++ using JNI
{
    init {
        callbackRootManager.register(PlayerConnected::class) {
            MyRepositoryExample.findByName(it.player.name) // Cannot execute suspended function
        }

        callbackRootManager.register(PlayerDisconnected::class) {
            MyRepositoryExample.save(it.player.account) // Cannot execute suspended function
        }
    }
}

// File ControllerDemo2.kt
// Singleton Class
class ControllerDemo2(callbackRootManager:  CallbackManagerRoot) // API bridge in C ++ using JNI
{
    callbackRootManager.register(PlayerKeyPress::class) {
        if(it.keyPress = 0xFFFFFFFF) {
            val position = it.player.position
                
            it.player.nodes.asFlow() // non-blocking path node search
                .filter { node -> position.distance(node.position) < 1.0f }
                .collect { node ->
                   MyRepositoryExample.save(it.player.account) // Cannot execute suspended function
                }
        }
    }
}

// File Random.kt
object Random {
    fun randomMainThread() {
        // using MyRepositoryExample2
    }

    fun otherFunctionMainThread() {
        // using MyRepositoryExample2
    }
}

launch returns a Job so you can manually deal with the lifetime of the individual task. Essentially, whatever you would do in reactor to avoid leaks with Disposable, you need to do that with Job.

A CoroutineScope can be used to cancel any entire group of tasks. But if that’s not your use case, then you just need to manage them one by one.

Is there a particular Reactor pattern you’d like to emulate with coroutines? I’ve used RxJava heavily in the past which is similar.

Also, I don’t think this is your case but if you need the callback code to finish inside the register lambda, then you need to use runBlocking. You cannot turn a blocking API into a non-blocking one by implementing it with coroutines. If it doesn’t have the suspend modifier, it can’t suspend.

The event logger is not a problem, because the only thing that runs asynchronously is the database query or heavy calculations.

I never used any Disposables in Reactor, my way of working with Reactor was something like this

Example 1:

// PlayerDisconnected
DbHandler.database.accountsReactor.updateOne(
            User::_id eq playerData._id,
            set(
                User::position setTo e.player.position,
                User::interior setTo playerData.interior,
                User::world setTo playerData.world
            )
        ).subscribe {
            println("CurrentThread ${Thread.currentThread().name}")
        }

Example 2:

// In some singleton helper
 Flux.fromIterable(Teleports.get())
            .publishOn(Schedulers.boundedElastic())
            .filter { door -> playerPosition.distance(door.position) < 1.0f }
            .subscribe { position->
                player.position = position
                println("CurrentThread ${Thread.currentThread().name}")
            }

I would like to work similarly with Coroutines.

You are looking for GlobalScope. If a coroutine is left running when you don’t care about its result anymore, then that is often considered a leak. This is no different than a Reactor subscription that is left running when you don’t care about the result.

I suggest that any time you call launch, consider when you would want to cancel the job because it’s no longer relevant. I suggest the same consideration any time you subscribe with Reactor.

1 Like