I have a hard time to convert a Flow relying in its internals on BroadcastChannel
, ReceiveChannel
and select into a purely Flow
based implementation. Foremost, I would like to have a correctly working replacement for the deprecated ReceiveChannel.filter{...}
method.
I would appreciate it a lot if you could give me hints / rewrite my working Channel based implementation below.
I have tried to simplify the implementation as much as possible: conceptually it is a flow returning a chunk of numbers, then waits until it gets either: a heartbeat signal or a notification that new data can be fetched. This concept is used for a real world implementation for a ServerSentEvent stream.
Here is my working code - you can copy/paste it in your IDE and execute with the main
method:
import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.flow import kotlinx.coroutines.selects.select import kotlin.random.Random enum class DataTypes { NUMBERS, STRINGS; } class DataOrHeartBeatProducerFlow(val notificationChannel: BroadcastChannel< DataTypes>) { @ExperimentalCoroutinesApi fun eventsFlow(id: Long? = 0, filter: DataTypes): Flow = flow { //filter is the problematic part here: it is deprecated. The question is //how to convert this into a purely Flow based approach val filteredNotificationChannel = notificationChannel .openSubscription() .filter { it == DataTypes.NUMBERS } suspend fun notificationOrHeartBeat(): DataTypes? = coroutineScope { val tickerChannel = ticker(1000) select { tickerChannel.onReceive { //if ticker passes: heartbeat, which is null null } filteredNotificationChannel.onReceive { tickerChannel.cancel() //if data notification: return datatypes received it } } } var latestId = id ?: 0 suspend fun fetch(offset: Long) = ((offset + 1)..(offset + 5)).toList().asFlow().collect { entity -> emit(entity.toString()).also { latestId = entity } } /** * Core method: wait for heartbeat or datatype notification. * - On Heartbeat return empty 'heartbeat' * - On datatype notification fetch records from given offset */ tailrec suspend fun take(): Unit { notificationOrHeartBeat().let { if (it == null) emit("heartbeat") else fetch(latestId) } take() } take() } }
@ExperimentalCoroutinesApi @InternalCoroutinesApi fun main() = runBlocking { val channel = BroadcastChannel<DataTypes>(Channel.CONFLATED) val f = DataOrHeartBeatProducerFlow(channel).eventsFlow(0, DataTypes.NUMBERS) GlobalScope.launch { repeat(100) { delay(500L) channel.send( if (Random.nextBoolean()) DataTypes.NUMBERS else DataTypes.STRINGS ) } } f.collect { println(it) } }