I want to:
- track many entities which publish their individual state asynchronously (perhaps they could emit through a
MutableSharedFlow
orChannel
) - relate their state to some asynchronously changing global state, in order to compute a secondary state for each of these entities
- I want to observe these secondary state changes, ideally through collecting a
Flow
.
The problem is perhaps a bit too abstract, so I’ve confabulated a toy example below. I’ve however encountered the abstract shape of this problem many times, such as when tracking variables in a formula or tracking relative locations of elements within their context, and I never really found a satisfactory way to solve it. Usually: ‘make everything single-threaded’ or ‘accept the redundant computations’.
So now for my toy example. Suppose that we are tracking the objective feeling of worry of gas consumers as a function of their individual gas reserves to the ever-fluctuating changing gas price.
A naive solution using Flows:
typealias CitizenId = String
enum class Worry {
Low, Medium, High
}
val gasReservesPerCitizen = MutableStateFlow<Map<CitizenId, Float>>(emptyMap()) // could change at any time
val centralizedGasPrice: MutableStateFlow<Float?> = MutableStateFlow(null) // could change at any time
// Relate the gas reserves for each citizen to the gas price in order to compute their worry.
// This is not so nice because we recompute every value each time
val worry: Flow<Map<CitizenId, Worry>?> =
centralizedGasPrice.combine(gasReservesPerCitizen) { price, gasTanks ->
price?.computeWorry(gasTanks)
}
private fun Float.computeWorry(
gasTanks: Map<CitizenId, Float>
): Map<CitizenId, Worry> {
val gasPrice = this
return gasTanks.mapValues { (_, gasReserve) -> particularWorry(gasPrice, gasReserve) }
}
private fun particularWorry(price: Float, gasReserve: Float): Worry {
return when (price * price - gasReserve) {
in Float.NEGATIVE_INFINITY..0F -> Worry.Low
in 0F..500F -> Worry.Medium
else -> Worry.High
}
}
While this works, it’s of course ridiculous to recompute the worry for each and every citizen every time one gas reserve changes. But when the gas price changes, we do want to recompute every citizen. So I guess it’s a concurrent memoization problem at the heart. I feel like would I could solve the problem if there was some observable variant of ConcurrentHashMap
that allows me put individual new values instead of performing Flow.combine
on the entire state. But I think ConcurrentHashMap
is not so straightforward to observe.
I’m not such an expert on threading, maybe I could just use a plain Map
with a mutex lock, but it feels scary because I will introduce complicated synchronization logic. How can I do this cleanly?