Coroutine select - need onJoin or onAwait that cancels if not chosen?

If I’m not mistaken, calling onJoin on a Job inside select will NOT result in the job being canceled if another clause is chosen (same with onAwait.) However, this is absolutely functionality I will regularly use in the context of a game engine – for example, implementing a select clause like onEvent by launching a job that will invoke a suspending function under the hood (i.e. the non-select implementation) then invoking onAwait on it, ideally. However the job should be completely encapsulated here, and onAwait does not appear to cancel it when not chosen (meaning it leaks.)

To get around this, I’ve implemented my own SelectClause1 that’s virtually identical to an internal implementation, but simply calls job.cancel inside disposeOnSelect. But this seems really verbose and unnatural given the simplicity of what I want to do.

Am I misunderstanding something, or going about this the wrong way? If not, then may I propose adding functions like onJoinOrCancel and onAwaitOrCancel to kotlinx.coroutines.selects? Thank you.

Simplified example of what I’m trying to do:

class Events {

    private val contWaitingByEvent = HashMap<Event, ArrayList<CancellableContinuation<Any?>>>()

    fun fire(event: Event, value: Any?) {
        val list = contWaitingByEvent[event]
        list.forEach { it.resume(value) }
        list.clear()
    }

    // "Normal" version of functionality:
    suspend fun waitFor(event: Event): Any? {
        val eventList = contWaitingByEvent.getOrPut(event) { ArrayList() }
        return suspendCancellableCoroutine { cont ->
            eventList += cont
            cont.invokeOnCancellation {
                contWaitingByEvent[event]?.remove(cont)
            }
        }
    }

    // "Select" version I'm trying to implement:
    context(CoroutineScope, SelectBuilder<R>)
    inline fun <R> onEvent(event: Event, crossinline block: suspend (Any?) -> R) {
        async { waitFor(event) }.onAwait { block(it) } // this leaks when not selected, i.e. it isn't canceled!
    }
}

(I don’t like the dependence on CoroutineScope, which is why I’m wondering if I’m going about this completely wrong. Thanks for reading.)

Never mind, I solved my problem by avoiding jobs altogether and increasing implementation complexity of the Events class - basically, it now enqueues either: continuations, calling resume or cancel on them; or lambdas, which my custom SelectClause1 creates, and are responsible for invoking the select clause’s block followed by select.continuation. The clause also calls select.disposeOnSelect with code to deregister the lambda from Events if the clause isn’t chosen.

For anyone who might encounter the same problem, flow merge would be a nice workaround:

// This does not cancel unselected job
val selected = select {
  async { suspendTask1() }.onAwait { it }
  async { suspendTask2() }.onAwait { it }
}
// This cancels unselected job
val selected = merge(
  flow { emit(suspendTask1()) }
  flow { emit(suspendTask2()) }
}.first()
1 Like