I am trying to implement a flow collection that has a graceful shutdown. In other words, it will stop consuming messages on cancellation, but still attempt to complete its last (potentially suspending) collection action. For example:
val job = generateSequence(0) { it + 1 }.asFlow().take(10).cancellable()
.onEach {
println(it)
// causes block to be suspending and cancellable
delay(1000)
println("$it completed")
}.launchIn(CoroutineScope(Dispatchers.Default))
delay(2000)
job.cancelAndJoin()
I’ve tried a few different implementations but none of them are particularly satisfying. I can use withContext(NonCancellable) and withTimeout() but that makes every collection action have a timeout instead of just the last possible one. Is there a language construct to accommodate this or would I have to implement a custom context that would handle a cancellation this way?
From your description it is not clear, what timeout do you mean. NonCancellable by itself should do what you asked for. Do you want to postpone cancellation for specified amount of time?
Also, graceful shutdowns usually utilize two events: first, the component is notified about the shutdown and then it is forced to stop. You can’t do both by cancellation, you would need some additional mechanism.
Yeah I was hoping to postpone cancellation to allow time for the last collection to process, but not accept any more flow emissions.
Your second point is a good one. Is there a way to get an infinite flow to stop without cancelling? Seems like I’d need to intercept the messaging and provide a custom type that implies it’s a stop and then cancel from within the NonCancellable context.
I tried to come up with a way to use that but it seems to fire only after the collection is cancelled, so the only thing I could think of was to save the state and then in the onComplete run the logic again but that doesn’t seem like a great solution since it potentially duplicates logic.