How to convert Channel-based implementation to (State)Flow

I have a hard time to convert a Flow relying in its internals on BroadcastChannel, ReceiveChannel and select into a purely Flow based implementation. Foremost, I would like to have a correctly working replacement for the deprecated ReceiveChannel.filter{...} method.

I would appreciate it a lot if you could give me hints / rewrite my working Channel based implementation below.

I have tried to simplify the implementation as much as possible: conceptually it is a flow returning a chunk of numbers, then waits until it gets either: a heartbeat signal or a notification that new data can be fetched. This concept is used for a real world implementation for a ServerSentEvent stream.

Here is my working code - you can copy/paste it in your IDE and execute with the main method:


import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.selects.select
import kotlin.random.Random

enum class DataTypes {
    NUMBERS, STRINGS;
}

class DataOrHeartBeatProducerFlow(val notificationChannel: BroadcastChannel<
DataTypes>) {

    @ExperimentalCoroutinesApi
    fun eventsFlow(id: Long? = 0, filter: DataTypes): Flow = flow {
        //filter is the problematic part here: it is deprecated. The question is 
        //how to convert this into a purely Flow based approach
        val filteredNotificationChannel = notificationChannel
                .openSubscription()
                .filter { it == DataTypes.NUMBERS }

        suspend fun notificationOrHeartBeat(): DataTypes? = coroutineScope {
            val tickerChannel = ticker(1000)
            select {
                tickerChannel.onReceive {
                    //if ticker passes: heartbeat, which is null
                    null
                }
                filteredNotificationChannel.onReceive {
                    tickerChannel.cancel()
                    //if data notification: return datatypes received
                    it
                }
            }
        }

        var latestId = id ?: 0
        suspend fun fetch(offset: Long) = ((offset + 1)..(offset + 5)).toList().asFlow().collect { entity ->
            emit(entity.toString()).also { latestId = entity }
        }

        /**
         * Core method: wait for heartbeat or datatype notification.
         * - On Heartbeat return empty 'heartbeat'
         * - On datatype notification fetch records from given offset
         */
        tailrec suspend fun take(): Unit {
            notificationOrHeartBeat().let {
                if (it == null) emit("heartbeat") else fetch(latestId)
            }
            take()
        }

        take()
    }
}



@ExperimentalCoroutinesApi
@InternalCoroutinesApi
fun main() = runBlocking {

    val channel = BroadcastChannel<DataTypes>(Channel.CONFLATED)
    val f = DataOrHeartBeatProducerFlow(channel).eventsFlow(0, DataTypes.NUMBERS)
    GlobalScope.launch {
        repeat(100) {
            delay(500L)
            channel.send(
                if (Random.nextBoolean()) DataTypes.NUMBERS else DataTypes.STRINGS
            )
        }
    }
    f.collect { println(it) }

}

That would be Flow.filter

Instead of trying to recursively select with a Channel and timeout, you can use transformLatest to forward a value and then send out heartbeats after handling a value until a new value arrives. (You can prime the pump with an initial null value so heartbeats start running right away)

Just in case the real entity source that this dummy code is replacing gives a List rather that a Flow, remember that for-loops and forEach still work too and will be more readable than asFlow().collect :slight_smile:

This should give you a rough idea of what I mean:

val filteredNotifications = notificationChannel.asFlow()...

fun fetchedValuesOrNulls(offset: Long) = flow {
    emit(null) //cause heartbeats to start

    var latestId = offset
    filteredNotifications.collect {
        //fetch and emit the real entity values
    }
}

fun eventsFlow(id: Long) = fetchedValuesOrNulls(id).transformLatest {
    //transformLatest runs the lambda in it's own coroutine
    //the coroutine is cancelled when a new value arrives
    //delay will throw cancellation exception exiting the while loop below
    if (it != null) emit(it)
    while (true) {
        delay(1000)
        emit("heartbeat")
    }
}

Thanks a lot for your straightforward and much simpler solution. I love it :slight_smile:!. Knowing thy Flow primitives makes the difference…

I also changed the notificationChannel of type BroadcastChannel to MutableStateFlow so in the final solution I got rid of all ‘hot’ resources.

I’m happy with the result.