Cancelling a suspended function via a CompletableJob

Suppose that I have some sort of worker coroutine that processes jobs from a queue (a Channel), something like this:

suspend fun sendPacket(packet: Packet) {
    val job = Job()
    val sendTask = Pair(packet, job)
    channel.send(packet)
    job.join() // wait until send task is complete
}

[...]

suspend fun worker() {
    while (true) {
        val (packet, job) = channel.receive()
        try {
            doBlockingSend(packet)
            job.complete()
        } catch (e: CancellationException) {
            job.cancel(e)
            throw e
        }
    }
}

This works. However, if the coroutine that calls sendPacket is cancelled while sendPacket is suspended, it would make sense to somehow signal to the worker that the send attempt (the doBlockingSend call) should also be aborted. (Let’s assume that there is some sort of explicit abortBlockingSend function available.) Otherwise, the send attempt is only seemingly cancelled - the worker is still blocked until the actual send operation inside it is done.

Any suggestions on how to handle this? I suppose this isn’t an uncommon situation, so I guess there have been solutions for this around.

Why is sendPacket not calling doBlockingSend itself? What is the goal of using worker?

Trying to manage coroutines from other coroutines yourself is certainly possible, but it’s awkward and often is unnecessary. If you explain the use case, you can probably get more applicable advice.

Either way, you’ll want to use runInterruptible to actually cancel blocking code, and withContext(Dispatchers.IO) (or a custom dispatcher) to not block other coroutines from running while doBlockingSend runs.

This isn’t a super common scenario. Usually if you want to call some async code and wait for it, you just call the suspending code itself directly.

I had it like that previously, but it turns out that the doBlockingSend call is not thread safe because it increments internal counters which also interact with corresponding blocking receive calls (these read out the counters). Fixing that is not possible (old code written for strictly single-threaded applications), so I have to ensure thread safety. Also, sometimes, special acknowledge packets are received that must be responded to immediately before I send out anything else.

As a solution, I run a worker with a dispatcher that uses a single-threaded executor under the hood. This makes it possible for me to avoid having to use synchronization primitives like mutexes (and, the coroutine Mutex can be rather slow), and also allows me to immediately respond to these ACK packets.

I do use withContext(Dispatchers.IO) around these calls, but did not know about runInterruptible. Interesting. How does that interruption work though?

Have you tired using your single thread dispatcher from sendPacket?

withContext(mySingleThreadedDispatcher) { doBlockingSend() }

That should fix your non-thread-safe issue.

I’m uncertain of how to address your “special acknowledgement packets” without more explanation of how your current code is attempting to address it. A single worker can’t immediately respond to anything if it’s blocked working on a previous request so I’m confused on how the worker coroutine is helping you at all.

Also, if you are using a single coroutine for your worker, you probably don’t need it to run on a single threaded dispatcher so long as you aren’t calling launch/async. While a single coroutine may switch between threads, it’s running sequentially and there will be no concurrent calls.

Your single threaded dispatcher is likely doing nothing for your worker as it is since Dispatchers.IO uses a thread pool so I guessing your calls are not happening on a single thread anyways.

Docs for runInterruptible are here. It uses thread interruption to cancel blocking calls.

If you have some other mechanism for cancelling your blocking work, then suspendCancellableCoroutine (detect current coroutine cancellation) or Job.invokeOnCompletion (detect Job finished) may be of some use.

If you stick with the worker, I’d suggest switching around your task’s Job ancestry to leverage Structured Concurrency.

Consider this expample that demonstrates running code with a provided CoroutineScope instead of just using the current CoroutineContext.

import kotlinx.coroutines.*;
import kotlinx.coroutines.channels.*;

val channel = Channel<PacketRequest>()

fun main() {
    runBlocking {
        launch { worker() }
        sendPacket(Packet)
    }
    println("All done")
}

object Packet
fun doBlockingSend(packet: Packet) { println("doBlockingSend") }

suspend fun sendPacket(packet: Packet) {
    coroutineScope { // creates a CoroutineScope for launching the actual work
        val completionSignal = Job()
        val sendTask = PacketRequest(packet, this, completionSignal::complete)
        channel.send(sendTask)
        completionSignal.join() //Keep CoroutineScope alive so it can be used by worker.
    }
}

suspend fun worker() {
//    while (true) {
        val (packet, requesterScope, onComplete) = channel.receive()
        val job = requesterScope.launch {
            withContext(Dispatchers.IO) {
                doBlockingSend(packet)
            }
            onComplete()
        }
        job.join()
//    }
}

data class PacketRequest(
    val packet: Packet,
    val requesterScope: CoroutineScope,
    val onComplete: () -> Unit
)

A worker can ensure that only one packet is processed at a time, but the actual processing can still happen under the Job hierarchy of the request’s sender. Cancellation will propagate from sender parent to child task and exceptions will propagate from child task to sender parent.

Not that by using a worker, you also have the problem of what happens if the worker is cancelled before all packets have been sent/processed.

Thanks for your reply. Well, the (sparse) documentation of that old code indicated that any ongoing write attempt is exempt from that “reply immediately with ACK” rule, but that any subsequent write call must wait until ACK is sent, and that the ACK response must be sent ASAP (that is, it can’t just be sent prepended to some other future eventual write operation).

I did do some further digging though, and it seems like this may in fact not be necessary after all. And this changes everything, because if the ACK response does not have to immediately be sent, it is fine to just rely on withContext(mySingleThreadedDispatcher). So I simplified that worker (I still use it for receiving packets), and it has worked well so far. So, there’s hoping I can stick to this, because I agree that having to use a worker for writing complicates things significantly.