Coroutines/Flow: keeping track of many concurrent 'sub-states'

I want to:

  • track many entities which publish their individual state asynchronously (perhaps they could emit through a MutableSharedFlow or Channel)
  • relate their state to some asynchronously changing global state, in order to compute a secondary state for each of these entities
  • I want to observe these secondary state changes, ideally through collecting a Flow.

The problem is perhaps a bit too abstract, so I’ve confabulated a toy example below. I’ve however encountered the abstract shape of this problem many times, such as when tracking variables in a formula or tracking relative locations of elements within their context, and I never really found a satisfactory way to solve it. Usually: ‘make everything single-threaded’ or ‘accept the redundant computations’.

So now for my toy example. Suppose that we are tracking the objective feeling of worry of gas consumers as a function of their individual gas reserves to the ever-fluctuating changing gas price.

A naive solution using Flows:

typealias CitizenId = String

enum class Worry {
    Low, Medium, High
} 

val gasReservesPerCitizen = MutableStateFlow<Map<CitizenId, Float>>(emptyMap()) // could change at any time
val centralizedGasPrice: MutableStateFlow<Float?> = MutableStateFlow(null) // could change at any time

 // Relate the gas reserves for each citizen to the gas price in order to compute their worry. 
// This is not so nice because we recompute every value each time
val worry: Flow<Map<CitizenId, Worry>?> =
    centralizedGasPrice.combine(gasReservesPerCitizen) { price, gasTanks ->
        price?.computeWorry(gasTanks)
    }

private fun Float.computeWorry(
    gasTanks: Map<CitizenId, Float>
): Map<CitizenId, Worry> {
    val gasPrice = this
    return gasTanks.mapValues { (_, gasReserve) -> particularWorry(gasPrice, gasReserve) }
}

private fun particularWorry(price: Float, gasReserve: Float): Worry {
    return when (price * price - gasReserve) {
        in Float.NEGATIVE_INFINITY..0F -> Worry.Low
        in 0F..500F -> Worry.Medium
        else -> Worry.High
    }
}

While this works, it’s of course ridiculous to recompute the worry for each and every citizen every time one gas reserve changes. But when the gas price changes, we do want to recompute every citizen. So I guess it’s a concurrent memoization problem at the heart. I feel like would I could solve the problem if there was some observable variant of ConcurrentHashMap that allows me put individual new values instead of performing Flow.combine on the entire state. But I think ConcurrentHashMap is not so straightforward to observe.

I’m not such an expert on threading, maybe I could just use a plain Map with a mutex lock, but it feels scary because I will introduce complicated synchronization logic. How can I do this cleanly?

I think you already answered your main question: if you need to track many sub-states in relation to one global state without recalculating everything each time anything changes, then use combine().

Problem presented in your example is another story. It is really about observing changes to individual items in a regular collection. I don’t think this is easily doable. You would need to “diff” the map with the old value whenever it changes. Instead, you should implement (or find existing implementation) of observable map that would allow to create a separate flow per each key.

I’m not sure what do you mean about threading, mutexes and ConcurrentHashMap. Your problem, as you described it, doesn’t need any of these. It sounds like yet another problem.

Ahh, ok, so you need something like a mix of mapValues() and combine(). Sounds like a good idea for a project :slight_smile:

Yes! I think your idea for using a Map where each value is a Flow sounds interesting. Do you know of a reference implementation somewhere?

Regarding the Mutex, I imagined something like this,

val map: MutableMap<CitizenId, MutableSharedFlow<Float>> 
suspend fun emitGasReserve(id: CitizenId, reserve: Float) {
        mutex.withLock {
            map.getOrPut(id) { MutableSharedFlow() }.emit(reserve)
        }
    }

But I need to have some technique of observing any change in the map. It’s quite a conundrum for me, I need to think a little more about it.

Or you can go in the opposite direction and keep just a single Flow.

class GasState(val price: Float, val gasStatePerCitizen: Map<CitizenId, CitizenGasState>) {
    inner class CitizenGasState(val reserve: Float) {
        val worry = particularWorry(price, reserve)
    }
}
val state = MutableStateFlow<GasState>()

fun changeSomething(…) {
    // This update call is atomic
    // You can keep any relevant data from “prev”
    state.update { prev -> … }
}

val worry = state.map { s -> … }