How to merge flows while preserving the order

I have a question about Kotlin flow. Basically what I want is that I want to enjoy the benefit of parallelism of merge(), while also want to preserve the orders of flows.

Here is a use case. Let’s say there are two network APIs:

  • one is to get a list of IDs; and
  • the other one is given an ID, get an user

I what to

  1. Use the 1st API to get the IDs
  2. Use the 2nd API to get all the users
  3. Return the list of the users to UI for display (e.g. on Android)

To me, this looks like a perfect use case for Kotlin flows. I would probably do it like this:

// NetworkApi.kt

interface NetworkApi {

  @GET("ids")
  suspend fun getIds(): List<Int>

  @GET("users/{id}")
  suspend fun getUser(id: Int): User
}

And here is how I would call these APIs, using flow:

val usersFlow = flow {
    emit(networkApi.getIds())
  }.flatMapConcat { // it: List<Int>
    val flowList = mutableListOf<Flow<User>>()
    val userList = mutableListOf<User>()
    
    for (id in it) {
      flowList.add(flow{ emit(networkApi.getUser(id)) })
    }
    flowList.merge().toList(userList)
    userList
  }

Note that I used merge() because I want to send the user requests ASAP in parallel, rather than one after another.

However, according to the documentation of merge(), it will NOT be “preserving an order of elements”.

Is there a way to have both: parallelism and preserved order?

Thanks!

This may generate N coroutines (and N concurrent requests, unless your API client limits it somehow), where N is the number of IDs.

Are you ok with that? If so:

fun NetworkApi.getUsers() = channelFlow {
    getIds().forEach { id ->
        launch {
            send(getUser(id))
        }
    }
}
1 Like

Just launching in a loop will not maintain order.

You could merge getIds().withIndex() and sort them later.

Also you could start the coroutines in a loop that tracks the pending result in a list to keep the order.

getIds().map { async { getUser(it) } }.map { it.await() }
2 Likes

Thank you for the answer. ChannelFlow might be an overkill no? I think the answer below with async and await (or awaitAll) is easier to understand. But thanks anyway!

1 Like

I like this approach a lot.
Thanks!

It depends on your usecase. Flow is (potentially lazy) (potentially infinite) stream of values.
If you don’t need a stream, then this is obviously an overkill - just use a dumb simple List.

There are no simple way to get ordered stream.
For that, you need some sort of ordering buffer: request values with small batches,
emit values in Flow: 1) when the buffer is full; 2) periodically, when the buffer has a contitnious sequence if IDs at the beginnig.

1 Like