NonCancellable context for proper clean shutdown of a network communication

I have a coroutine that keeps sending packets to a remote host every second to tell that host that a local action is ongoing. Once the action is finished, a specific “action done” packet must be sent. Pseudo code:

try {
    while (true) {
        delay(1000)
        sendPacket(generateActionIsOngoingPacket())
    }
} finally {
    sendPacket(generateActionIsFinishedPacket())
}

That way, as soon as my local action is done, I can stop the repeated packet sending simply via cancelling the coroutine job.

However, in sendPacket, a withContext(Dispatchers.IO) block is used, since sending packet is an IO operation. And, due to the prompt cancellation guarantee, once the coroutine is cancelled, withContext will not actually run its block - instead, it itself will cancel. So, no “action is finished” packet is sent.

One way to handle this is to use NonCancellable:

try {
    while (true) {
        delay(1000)
        sendPacket(generateActionIsOngoingPacket())
    }
} finally {
    withContext(NonCancellable) {
        sendPacket(generateActionIsFinishedPacket())
    }
}

This works. The “action is finished” packet is sent. Cancelling the job does what I expect.

However, I am not sure that this is the best approach. NonCancellable has a big potential to cause a lot of problems due to uncancellable code.

Also, at a deeper level, another question comes up: In this context, what does cancelling actually signify? By that I mean that cancelling may be done for multiple reasons. Perhaps it is simply because the local action was finished in an orderly fashion. But perhaps it is cancelled because somewhere else, a fatal error was detected, and now the whole system is shutting down. In the latter case, it would perhaps not be a good idea to try to send that “action is finished” packet - after all, what if IO is also affected by that fatal error? What if it hangs indefinitely?

Some refinements to my solution I was thinking of are:

  1. Make it a requirement for sendPacket implementations that they must timeout after a few seconds if the underlying IO is unresponsive. This at least prevents permanently hanging NonCancellable blocks.
  2. Write a custom subclass of Job, and add an extra function like “finish”. That one would internally call cancel, but would pass a subclass of CancellationException. In the code above, it would then check for that exception.

It would look like this:

class CustomJob : Job {
    fun finish() = cancel(MyCustomCancellationExceptionSubclass())
}

// [...]

try {
    while (true) {
        delay(1000)
        sendPacket(generateActionIsOngoingPacket())
    }
} catch (_: MyCustomCancellationExceptionSubclass) {
    withContext(NonCancellable) {
        sendPacket(generateActionIsFinishedPacket())
    }
}

That way, it would only send that “action is finished” packet if the coroutine is cancelled via the finished call. In non-orderly shutdown cases (like when a fatal error occurs), it would be regular “cancel it all now” behavior.

Thoughts? Are my reservations about NonCancellable correct here?

I’d suggest putting the loop in its own coroutine and just canceling that.

I will sometimes leverage Flow operators instead of messing with Job directly.

val isActionOngoing = MutableStateFlow(true)
... 
isActionOngoing.transformWhile { emit(it); it}.collectLatest {
    while (it) {
        delay(1000)
        sendPacket(generateActionIsOngoingPacket())
    }
}
sendPacket(generateActionIsFinishedPacket())

Edit: realized takeWhile wasn’t what I intended. Also need the false emitted so it can cancel the loop.

1 Like

Ah, so you’d create an inner coroutine just for the loop. That works.

I do realize now that I did not emphasize one detail though: It is necessary to make sure that I wait for the sendPacket(generateActionIsFinishedPacket()) line to be executed before I shut down the network device that is used inside sendPacket. For example, if I quit the application, I have to tear down that device. Before that, I have to cancel that coroutine, and wait for that last packet to be sent. In my version above, I can simply do a cancelAndJoin for that. In your version, I guess I could first cancel the inner coroutine and then call join on the outer one?

EDIT: I really like the flow approach, and I think I’ll go with that instead. If I see this correctly, that inner loop would then be cancelled by setting isActionOngoingto false. Why collectLatest instead of collect though?

EDIT2: OK, got it. It is necessary to allow for cancelling the loop. With it, when I set isActionOngoing to false, the previous running block for the true value is cancelled. Otherwise, the while (it) loop could not ever be cancelled. Is this correct?

Yeah, that works if you can’t just call teardown directly after generateActionIsFinishedPacket.

That’s right.

Alright then. The only minor concern remaining is that transformWhile is still marked as experimental. What are the real world consequences of using that experimental API? Has it happened to you already that changes to experimental APIs broke your code?

Experimental API (at least the way the Kotlin team uses this term) generally means that some part of the design is not yet finished or may change in the future. Breaking changes are normally restricted to mayer version updates eg. kotlin 1.5 to 1.6.
Also while the kotlin team obviously tries to ensure that there are no bugs, experimental features tend to have a little more bugs. That said, this is mostly the case for big features. I remember running into a few crashes with inline classes (during compile time. At runtime they never caused issues for me). Simple functions in libraries don’t tend to have that problem.

Funny enough, the experimental annotation broke one of my projects. At the time I was working on multiple projects using different kotlin versions and I did not realize that the annotation was renamed from Experimental to OptIn between version 1.2 and 1.3.

Also I remember that there were quite a number of changes made to the coroutine system early on (I think going from kotlin 1.1 to 1.2, don’t quote me on that though). While it did not effect me personally I think many people were required to do at least a minor pass over their coroutine code to update versions.

Lastly the kotlin team tries (In my oppinion successfully) to comunicate those kind of changes as early as possible. If you follow their blogposts, watch the keynote speeches at kotlinconf and maybe also follow the KEEPs of the experimental systems you’re using you should have at least a month or two before any breaking changes are made. Also when possible they tend to keep the old API around in a deprecated state, at least for a while.

1 Like

Would transform instead of transformWhile also work? The while loop would not do anything anyway if the value is set to false.

I see now that stopping the flow then is not possible, at least not easily, because transformWhile can make use of the internal AbortFlowException class to abort the flow in an orderly fashion. Nevermind.

Good to know. I’d have to re-test everything in my project anyway after a major version change, so I guess that’s fine as long as I keep a watch on the experimental APIs.