Execute steps in a flow concurrently

I have an infinite flow. For each event I get, I fetch a bunch of callbacks that are interested in the event. The callbacks are suspend functions that will do remote calls and are independent from each other.

After some googling, I found that the Flow API offer for example buffer() or flowOn() to address this. However, I can’t get it to work in my test code.

object Foo {
	suspend fun cb1(): Unit = delay(1000)
	suspend fun cb2(): Unit = delay(1000)
	suspend fun cb3(): Unit = delay(1000)
}

private fun getCallbacks(): Flow<Pair<Any, KFunction<Unit>>> = flow {
	emit(Foo to Foo::cb1)
	emit(Foo to Foo::cb2)
	emit(Foo to Foo::cb3)
}


suspend fun main(args: Array<String>) {
	val millis = measureTimeMillis {
		val job = GlobalScope.launch {
			(0 until 3).asFlow()
				.flatMapMerge { getCallbacks() }
				.buffer(capacity = 64)
				.flowOn(Dispatchers.Default)
				.collect { it.second.callSuspend() }
		}

		job.join()
	}

	println("Took $millis")
}

I expect it to take 3 seconds. For example, for the first event, grab all the callbacks and run them concurrently. And then to the same for the next emitted event and so on… However, this takes 9 seconds to execute, indicating that it’s executing sequentially, without any concurrency.

What am I not understanding?

Flows are sequential by design, see this issue for progress on parallel flows.

1 Like