I have an infinite flow. For each event I get, I fetch a bunch of callbacks that are interested in the event. The callbacks are suspend functions that will do remote calls and are independent from each other.
After some googling, I found that the Flow API offer for example buffer() or flowOn() to address this. However, I can’t get it to work in my test code.
object Foo {
suspend fun cb1(): Unit = delay(1000)
suspend fun cb2(): Unit = delay(1000)
suspend fun cb3(): Unit = delay(1000)
}
private fun getCallbacks(): Flow<Pair<Any, KFunction<Unit>>> = flow {
emit(Foo to Foo::cb1)
emit(Foo to Foo::cb2)
emit(Foo to Foo::cb3)
}
suspend fun main(args: Array<String>) {
val millis = measureTimeMillis {
val job = GlobalScope.launch {
(0 until 3).asFlow()
.flatMapMerge { getCallbacks() }
.buffer(capacity = 64)
.flowOn(Dispatchers.Default)
.collect { it.second.callSuspend() }
}
job.join()
}
println("Took $millis")
}
I expect it to take 3 seconds. For example, for the first event, grab all the callbacks and run them concurrently. And then to the same for the next emitted event and so on… However, this takes 9 seconds to execute, indicating that it’s executing sequentially, without any concurrency.
What am I not understanding?