Would the StateFlows be retained via the scope passed into stateIn
after they have been removed from the flows map?
inline fun <K, T, R> SharedFlow<Map<K, T>>.flatMapLatestValues(
crossinline transform: suspend (value: T) -> Flow<R>
): Flow<Map<K, R>> = distinctUntilChangedBy { it.keys }
.runningFold(emptyMap<K, StateFlow<Pair<K, R>>>()) { flows, values ->
val scope = CoroutineScope(currentCoroutineContext())
values.mapValues { (key) ->
flows[key] ?: this.mapNotNull { it[key] }
.distinctUntilChanged()
.flatMapLatest(transform)
.map { value -> key to value }
.stateIn(scope)
}
}
.flatMapLatest { flows -> flows.values.combine { values -> values.associate { it } } }
To mitigate that possibility I originally implemented it with produceIn
so I could cancel them on removal from the flows map but the combine operator needs the replay of the current value feature of a StateFlow for the flows reused on emission to runningFold
.
inline fun <K, T, R> SharedFlow<Map<K, T>>.flatMapLatestValues(
crossinline transform: suspend (value: T) -> Flow<R>
): Flow<Map<K, R>> = distinctUntilChangedBy { it.keys }
.runningFold(emptyMap<K, ReceiveChannel<Pair<K, R>>>()) { flows, values ->
val scope = CoroutineScope(currentCoroutineContext())
flows.minus(values).values.forEach { it.cancel() }
values.mapValues { (key) ->
flows[key] ?: this.mapNotNull { it[key] }
.distinctUntilChanged()
.flatMapLatest(transform)
.map { value -> key to value }
.conflate()
.produceIn(scope)
}
}
.flatMapLatest { flows -> flows.values.map { it.receiveAsFlow() }.combine { values -> values.associate { it } } }