I have a SharedFlow that forwards messages that were received from the network. Sometimes, these messages are repeated. Some subscribers expect these duplicates, others need them filtered out. (The values are null in case the received messages were found to be invalid.)
Some coroutines only need to sample the most recent flow value (= a received message) sometimes.
These messages must be parsed, but I do not want to parse every message right when it arrives, since this wastes CPU cycles (since I only need the parse result if the coroutines actually need it). I use buffer
, map
, and first
for that, like this:
val myFlow = theSharedFlow.buffer(1, BufferOverflow.DROP_OLDEST).map { parseMessage(it) }
val value = myFlow.first()
This works well. However, I do not know how to integrate duplicate detection in a flow. (Duplicates are detected by comparing classes.) My initial idea was to use distinctUntilChanged
:
val myFlow = theSharedFlow.buffer(1, BufferOverflow.DROP_OLDEST).map { parseMessage(it) }.distinctUntilChanged { old, new ->
((old == null) && (new == null)) ||
((old != null) && (new != null) && (old::class == new::class))
}
val value = myFlow.first()
However, this does not work, since first
cancels the distinctUntilChanged
flow after getting a value. So, I am confused as to how to do that. I can’t run distinctUntilChanged
before buffer
, since as said, this would effectively run the parse function every time a message arrives.
So, I want to do the duplicate check on-demand, when I actually need a value. This means then that the previous value has to be stored somehow. But since first
cancels the collection after getting an item, I do not see how I can do that. Any ideas? A second buffer
maybe?