I have a Flow like this:
runBlocking {
val values = (0..10).asFlow().map {
println("generating $it")
delay(250)
if (it == 3) {
throw RuntimeException("error with $it")
}
"$it"
}
println(values.toList())
}
On error I want to retry only element with an error n
times, not the whole stream.
If retry fails I want to skip the element and continue the stream.
Thanks for help,
Artur
First of all - this isn’t supported in general case, because Flow
is the same with forEach
(but with async logic).
However I use function below. Not sure if it is absolutely right, however it works in my case:
fun <TInputData, TOutputData : Any> Flow<TInputData>.mapWithTryCatch(
logger: KLogger,
action: suspend (TInputData) -> TOutputData
) = mapNotNull { data ->
try {
action(data)
} catch (e: Throwable) {
logger.error(e) {
"Unable to process $data"
}
if (e is CancellationException || e.cause is CancellationException) {
throw e
}
null
}
}
Thanks!
So what should I use instead of Flow to do it kotlin way?
This is just operator, please use it instead of map
. So the code below work. And please remove logger if it is not needed.
private companion object : KLogging()
runBlocking {
val values = (0..10).asFlow().mapWithTryCatch(logger) {
println("generating $it")
delay(250)
if (it == 3) {
throw RuntimeException("error with $it")
}
"$it"
}
println(values.toList())
}