Coroutines for game scripting, technical questions

Context:

I’m developing a scripting API for my game engine, such that scripts can be “suspended” across multiple game frames for an intuitive flow, thus avoiding implementation of numerous tedious state machines, “command list” implementations, etc. For example:

suspend fun ScriptContext.npcBehavior() {
    while (true) {
        npc.walkTo(100, 0, 0)
        npc.textBlurb("Hello!")
        npc.walkTo(0, 0, 0)
        npc.textBlurb("World!")
    }
}

walkTo is implemented something like:

    suspend fun Npc.walkTo(x: Float, y: Float, z: Float) {
        suspendCancellableCoroutine { cont ->
            walkingSystem.beginWalking(this, x, y, z, cont)
        }
    }

Then later, we resume like:

class WalkingSystem {
    fun perFrameUpdate() {
        for (agent in agentList) {
            agent.pos += ...
            if (distance(agent.pos, agent.destination) < threshold) {
                agent.cont.resume(Unit)
            }
        }
    }
}

Of course we’d have to remove the agent from the list first, avoid concurrent modification due to resuming continuations, etc. But you get the idea.

If I use Dispatchers.Unconfined and either CoroutineStart.DEFAULT or UNDISPATCHED, it seems to work as expected, remaining on the main thread so long as I do everything on the main thread (which is mandatory for OpenGL, unfortunately. Shared contexts and context swapping are not an option.)

My Questions:

EDIT: I just stumbled upon Dispatchers.Main and the fact you can implement your own, and conceptually it sounds close to what I need, while potentially permitting the entire coroutine library to be used “safely,” provided you’re okay spinning up a few extra threads. (Sandboxing this for non-codies would be ideal though.) However, I can’t find a resource on implementing CoroutineDispatcher for myself, so I might be in for a deep dive here if the benefits are really there…

Any resources on implementing my own CoroutineDispatcher, or even better, a Main one? Could I just inherit from some existing dispatcher, override dispatch and check if the threads match, then do what Unconfined does if on main?

If implementing Main would be infinitely better than using Unconfined, then ignore the remaining questions. Otherwise…

Unconfined Questions
  1. Under my current approach with Unconfined, what conventions must scripts follow to stay on the main thread? So far, all I’m aware of is disallowing calls to withContext and delay – are there any other key methods?

  2. Is there a better way of disallowing methods then redeclaring them on a dispatch receiver with @Deprecated? Perhaps I could wrap kotlin.coroutines behind an extremely restricted subset of commands, though allowing direct use of Channels, select, and probably more would be nice.

  3. Is there a better configuration for this than what I described? I like Unconfined because it immediately executes the script in place, guaranteeing the calling or resuming thread is the executing one. It also means I have control over all threads in my engine, without deferring to an Executor at the top level - the API is essentially self-contained. Probably the biggest caveat is the “experimental” tag on Unconfined, and fears of violating the “main thread” rule without realizing.

  4. Could I actually use withContext somehow, ensuring that execution returns to the calling thread after resuming and leaving scope? This would allow easy integration of wrapped “blocking” calls, for instance. I just can’t support any arbitrary thread running any script after resuming! (Which appears to be the case with Unconfined, if I’ve understood correctly.) EDIT: I’m not sure if this is true any more.

  5. Any major problems I’m missing with this? Any big “gotchas” from calling resume outside any coroutine related scope?

Thanks for your time.

Related reading

KEEP/coroutines.md at master · Kotlin/KEEP · GitHub
Blocking threads, suspending coroutines | by Roman Elizarov | Medium
Fresh Async With Kotlin • Roman Elizarov • GOTO 2018 - YouTube
Coroutine dispatcher confined to a single thread
How to prevent unintentional multithreading

I think the main dispatcher is really the way to go - this is exactly how we should control scheduling to different threads in coroutines.

I never tried to integrate with an existing event loop, but you can look at implementations for JavaFX, Android or Swing. Looks complicated, but I think the main thing to implement is trivial:

// JavaFX
override fun dispatch(context: CoroutineContext, block: Runnable): Unit = Platform.runLater(block)

// Android
override fun dispatch(context: CoroutineContext, block: Runnable) {
    if (!handler.post(block)) {
        cancelOnRejection(context, block)
    }
}

// Swing 
override fun dispatch(context: CoroutineContext, block: Runnable): Unit = SwingUtilities.invokeLater(block)

You don’t necessarily have to register your dispatcher as Dispatchers.Main, you can simply create a global variable and in this case I suspect the above is all you need (?). To register as Dispatchers.Main you need some additional code, dispatcher factory, etc. and also, it has to support immediate scheduling which is also pretty easy to implement (we simply need to check if we are in the main thread already).

Also, all mentioned dispatchers implement Delay which adds some code as well. I believe this is not necessary, this is only an optimization in coroutines machniery and as a matter of fact, Delay is marked as @InternalCoroutinesApi. If you don’t implement it then coroutines will use their internal shared dispatcher for scheduling tasks in the future.

1 Like

Any ideas what’s happening here? Android file, line 167:

    private fun cancelOnRejection(context: CoroutineContext, block: Runnable) {
        context.cancel(CancellationException("The task was rejected, the handler underlying the dispatcher '${toString()}' was closed"))
        Dispatchers.IO.dispatch(context, block) // << why are we dispatching after cancel? How is this valid?
    }

On top of being confusing, this method is problematic because invokeOnTimeout no longer accepts a context parameter, meaning this function can’t be given a context to cancel and dispatch anyway.

Also, I’d like my “delayer” class that schedules continuations in the future to be able to cancel outstanding tasks on application exit, but CoroutineDispatcher gives me Runnables to work with, not ContinuationCancellables with a cancel method, meaning I’m not given an obvious way to do that. Am I even supposed to try?

I don’t really know, it goes much deeper into coroutines internals than my current knowledge. I guess it could be related to the information provided in docs for the dispatch() function:

This method should guarantee that the given block will be eventually invoked, otherwise the system may reach a deadlock state and never leave it.

And if I think about it, it makes a lot of sense. It is better to execute a continuation, even in a wrong dispatcher and even after cancelling the context, so it could at least throw an exception and propagate to parents. If we simply ignore the block of code and never execute it, it may become frozen indefinitely.

What do you mean by saying it no longer accepts a context?

You can override the interceptContinuation() to get access to continuations and then check if they are of CancellableContinuation type. By wrapping continuations into your own class you can track when they are being resumed, cancelled, etc. I don’t know if this is sufficient in your case, but it’s better than nothing.

1 Like

Also, don’t you go too deep into this? Most of the time we are not supposed to implement our own dispatchers, etc., this is pretty advanced stuff. I understand this is probably a necessity to integrate with the existing event loop, but why do you need your own delay and cancellation support directly in the dispatcher?

When you start the application, usually somewhere near the main() you start a “root” coroutine or root context and when you need to exit the application, you just cancel this root coroutine - it should automatically propagate to all subtasks of your application.

edit:
Documentation even states:

Cancellation mechanism is transparent for [CoroutineDispatcher] and is managed by [block] internals.

Although, it doesn’t say a dispatcher can’t/shouldn’t somehow participate in cancellations. It only says it doesn’t have to.

What do you mean by saying it no longer accepts a context?

Nevermind, I updated my coroutines library from 1.33 to 1.64 and the method signature is as expected now.

why do you need your own delay and cancellation support directly in the dispatcher?

Well delay seems easy enough to add, given that I already have a delay mechanism in my game loop that works with coroutines. The reason I suppose is to support the stdlib delay function, instead of attempting to disallow it via a wonky “deprecated method with same name” approach. Even without delay, I’ll have to handle cleanup of dispatched-but-not-delayed coroutines.

Cancellation is something I assumed was required to do in these methods, looking at the Android implementation – not having to worry about it might indeed simplify things. Looking at interceptContinuation, it appears to be final in CoroutineDispatcher, not sure if I can circumvent that. It’s invoked on resume, so I suppose I would just call run then hope it triggers.

Most of the time we are not supposed to implement our own dispatchers, etc

Right, but if I want to use Main dispatcher, I am actually required to implement from this class. It makes sense for my engine as well - Main is usually intended for a single UI thread, which incidentally, is probably restrictive for the same reason my engine must be restrictive (use of OpenGL or other graphics library that mandates interaction on a specific thread.) Using Main over the Unconfined-everywhere approach also allows any arbitrary coroutine to schedule a task in the context of my engine - a nice bonus.

EDIT: success! My Main implementation appears to be injected properly and working, including calls to delay… but that doesn’t mean there aren’t heinous issues involving cancellation and shut down, as we discussed.

There’s also the concern of this being internal APIs, which may change under my feet… but I don’t really see a way around that - injecting any Main requires use of kotlinx.coroutines.internal.MainDispatcherFactory with Java’s ServiceLoader.

So assuming I can get the shutdown process and cancellation sorted… is the pattern I initially outlined “valid” with use of my own Dispatchers.Main? I.e. is it safe for code outside any coroutine scope but on the “main thread” to maintain references to continuations, then call resume on them when some condition has been met?

I’m not sure if I understood you correctly, but both delaying and cancellations are provided by the coroutines framework itself and I think you don’t have to do anything to have them working properly.

delay() should work no matter if you implement Delay or not. I’m not 100% sure about the behavior here, but I think it checks if the dispatcher can handle delays by itself (so if it implements Delay) and if not then it schedules delayed task to their internal dispatcher and then it schedules to the correct one.

Cancellations “just work”, they don’t require support in the dispatcher. No matter if coroutine is currently running, it is scheduled for immediate execution or for delayed execution.

Did you try to… not implement any of these and just use delays and cancel delayed or not delayed tasks?

Ahh, right.

If we are going to use coroutines with a framework that utilizes the “main” thread, then as you said, I think implementing our own dispatcher is the proper way to handle this case. Unconfined is only a workaround and we can easily break something.

But we don’t have to use Dispatchers.Main if it complicates things or if it is an internal API. There is nothing magic about the Dispatchers.Main, it is simply a global variable with the value provided by libraries in the classpath. That’s it. I guess the main reason we have it is to reuse parts of UI code between Android and JS or to make the coroutines API more standard between platforms. In your case you can simply create a global oglMainDispatcher variable by extending CoroutineDispatcher (or even ContinuationInterceptor if you prefer) and it will work exactly the same as Dispatchers.Main.

Yes, you can store a continuation anywhere you want and then resume it from any thread you want at any time you need :slight_smile: You don’t have to resume from the coroutines context or from the main thread - you can do it anywhere. And yes, it should work correctly with the approach of the “main dispatcher”.

1 Like

Thank you for your assistance, it’s been extremely helpful :pray:

delay() should work no matter if you implement Delay or not. I’m not 100% sure about the behavior here, but I think it checks if the dispatcher can handle delays by itself (so if it implements Delay ) and if not then it schedules delayed task to their internal dispatcher and then it schedules to the correct one.

I’m not sure how it would execute anything on the main thread without my intervention, since I’m creating the thread myself - I think it’s only opportunity to resume control would be at suspension points and resume calls, which could be few and far between.

I suppose it could also be spinning up a separate thread that sleeps, awakes then schedules resumption via Dispatcher.dispatch, i.e. bare minimum implementation - but that would be less accurate and unnecessarily resource intensive for my purposes (don’t want extra threads doing anything if I can help it.) If internal API is brittle enough, however, it would be something to consider.

Regardless of whether I override it, I’ll be exposing my own delay methods anyway (they’re already implemented and relatively trivial - just a min-heap of continuations sorted on wake-up time, and like you said, safe to simply call resume once ready.)

Cancellations “just work”, they don’t require support in the dispatcher. No matter if coroutine is currently running, it is scheduled for immediate execution or for delayed execution.

Right, it’s not supporting cancellations themselves, but rather supporting “rejection” of dispatches in Dispatcher via cancellation. The reason I hypothetically need rejection is to support “shutting down” of the main thread and rejecting all dispatches during and thereafter. There might be some other mechanism for achieving this — I don’t think it’s coroutine scope however, because hypothetically anyone anywhere with any scope could call withContext(Dispatchers.Main), or otherwise so long as my dispatcher(s) are exposed, statically or not.

Even so, letting the main thread empty out all dispatches by executing them, then waiting on any delays beneath a certain threshold, seems like a reasonable strategy to at least minimize rejections.

Now that I’m thinking about it, it’s not like default Dispatchers halt the world to process pending coroutines, or invoke cancel… now I’m curious how Kotlin handles this by default. Maybe it’s okay to just close the process without touching the data structures responsible for dispatch and delay - however, coroutines knowing they’ll get a CancellationException at any interrupted delay point would be nice.

But we don’t have to use Dispatchers.Main if it complicates things or if it is an internal API.

Internal APIs are currently only needed for injecting Main, and implementing Delay. I think only benefits of Main are:

  • Enabling established convention (i.e. easier to pick up and learn, no error on calling Main)
  • Integration of libraries referencing it (hypothetically, I have no idea here)
  • Exposing “immediate” dispatch via overriding needsDispatch method (important for performance.)

Only cons are:

  • Maintaining some code that is small but multi-threaded, complexity in shutdown process.
  • Having to update code if JetBrains changes anything. Between 1.33 and 1.64 relevant changes were very minor, though technically there’s nothing stopping a major overhaul :grimacing:

I’m thinking about something like this:

suspend fun delay() {
    val dispatcher = coroutineContext[ContinuationInterceptor]
    if (dispatcher is Delay) {
        dispatcher.scheduleResumeAfterDelay(...)
    } else {
        internalDispatcherThatSupportsDelaying.scheduleResumeAfterDelay {
            dispatcher.dispatch(...)
        }
    }
}

This is actually more or less what it really does, we can see this in the code.

It may be less accurate due to double dispatching. On the other hand, once I read a long thread somewhere specifically about making delays in Kotlin coroutines accurate. They implemented some magic tricks utilizing knowledge about JVM and OS internals to make it as accurate as only possible. I would expect our custom implementation to be less accurate even if dispatching only once. But we can’t know this without running some tests.

Coroutines don’t have the concept of cancelling/shutting down dispatchers. They have a concept of cancelling/shutting down coroutines or tasks. If you cancelled the root context/job/coroutine that started your whole application, then you can be pretty sure all application’s activity has been cancelled and nothing is running in the dispatcher anymore.

Of course, graceful shutdown requires some time and during it I suspect you will still observe dispatches to your main thread. However, I guess it should be pretty quick, it will probably only throw CancellationException from all existing coroutines and that’s it. Also, this is actually a feature, not a bug. Remember we could have some finally {} blocks to close resources or do other cleanups and we would like to run them. Killing in the middle, while quicker, is less reliable.

If you have to guarantee no dispatches on the main thread after some point, then I think you can use a similar technique as in the dispatcher for Android - simply dispatch to Dispatchers.IO instead, it should throw immediately anyway. But if you mean to reject dispatches as a way to stop the application, then you shouldn’t really do this. Cancel the root coroutine and it will do everything for you.

You don’t have to wait for delays - delayed continuations will be cancelled immediately.

I just wanted to update my progress here - I’ve now implemented my own Dispatchers corresponding to either the main thread generally OR a specific phase of the game loop on the main thread. I implemented Delay for higher-precision suspending delay calls, though I also offer my own wait functions that offer additional parameters.

Shut down is still a little hairy, but right now lifecycle is fine and I don’t have any problems. Mainly it’s a question of whether I should assert or throw if coroutines are still running, even after giving them additional time (e.g. to handle cancellations they should have all been issued.)