Hello, I am newbie and would want to ask correct usage of coroutine and sharedFlow.
For example of below code, in the function I want to parallel call two actions. I expect that there will be 3 possible result:
The action1 return result first, then we use its result to return for function foo.
The action2 return result first, then we use its result to return.
The whole action is timeout after 3s and throw timeout error.
suspend fun foo() {
val a = MutableSharedFlow<Int>()
val sf = a.asSharedFlow()
return withTimeout(3000) {
launch {
action1().let { a.emit(it) }
}
launch {
action2().let { a.emit(it) }
}
sf.first {
return@withTimeout it
}
}
}
My question is:
After foo is returned, would the two launched coroutine for action1 and action2 cancelled? or they will continue to run in the background?
Let’s say action 1 is returned, but action 2 is not return back forever (may be network issue). Although the code may already quit foo, will action2 cause a leaking sharedFlow forever?
Do I need to do anything explicitly to close the MutableSharedFlow or SharedFlow which declared inside the function foo? or will they be closed and garbage collected normally after the function foo returned?
First of all, it doesn’t even compile, because we can’t return from first().
But most importantly, this code works much differently than you think. Remember that coroutines wait for all their children to finish. withTimeout() waits for all three children: action1(), action2() and first(). If action1 takes 500ms and action2 takes 2500ms, then this code will return the value of action1, but after 2500ms. If action2 takes 4000ms then it will throw timeout even if action1 returned successfully long ago. To fix this we have to explicitly cancel() the coroutine that is no longer needed.
Also, this seems to me like a misuse of flows. Flows are for sharing data between components of the application, but usually it doesn’t make sense to use them internally in the function. If you used them simply to wait for the first suspend function to finish, then there is select() function for such cases:
suspend fun foo(): Int = withTimeout(3000) {
val value1 = async { action1() }
val value2 = async { action2() }
select {
value1.onAwait {
value2.cancel()
it
}
value2.onAwait {
value1.cancel()
it
}
}
}
Still, it’s a little more code than I would expect for such a simple case. Maybe there is a better way.
Thanks for your answer @broot !
I tried wrote similar code b4, which action 1 is return in some hundred ms, while action 2 is not likely to return. But the result is that the overall flow can complete in several hundred ms, but not wait until timeout.
Doesn’t it that launch would do something like fire and forget? I thought the foo function would not necessary wait for the action finish to return.
But you are right, it is more make sense to use the select way to work.
Suspend functions generally should not “leak” any background tasks. They return only when they finished everything they started.
No, launch() isn’t fire and forget. The whole idea of structured concurrency is to perform asynchronous work in a reliable manner. GlobalScope is fire and forget and this is why it is marked as delicate API.
I would reconsider this advice. Flow can be a valuable tool to consider anytime you are dealing with multiple asynchronous values that are delivered over time. The Flow operators automatically take care of things like upstream cancellation which is helpful even if you are collecting the result internally.