Coroutines: best way to use fork and join?

With coroutines, there are so many data structures and ways to realize something, I find it easy to lose the overview.

I’d like your opinion what would be the slickest (readability + speed) way to conditionally collect a bunch of computations executed in parallel into an unordered list.

Currently I use something like this, but I wonder if there is something shorter and pure Kotlin:

launch + ConcurrentLinkedQueue

val unorderedResults = ConcurrentLinkedQueue<Int>()
runBlocking {
  for(i in 0..1000) {
    launch {
      if (i % 2 == 0) results.add(i*i)
    }
  }
}

I’ve looked at the documentation and found two other ways how to do it:

async

val unorderedResults = runBlocking {
  (0..1000).map { i ->
    async {
      if (i % 2 == 0) i*i else null
    }
  }.awaitAll().filterNotNull()
}

Though, not really shorter and there is this extra filter-step. Personally, I also don’t find it better readable. Exchanging the first map with mapNotNull curiously does not change the type of the list resulting from awaitAll to List<Int>, it is still List<Int?>.

Channels

val unorderedResults = runBlocking {
  val channel = Channel<Int>()
  for (i in 0..1000) {
    launch {
      if (i % 2 == 0) channel.send(i*i)
    }
  }
  val list = ArrayList<Int>()
  channel.consumeEach { list.add(it) }
  list
}

From the documentation, I had the impression that this is the intended way to do it because it was mentioned that channels are basically Kotlin’s blocking queues. Not sure if collecting the things sent to the queue into a list could be a one-liner, map etc. is deprecated in favour of Flow.

I often use the async version, but, for your use case, you can consider:

    val unorderedResults = runBlocking(Dispatchers.Default) {
        channelFlow {
            repeat(1000) { i ->
                launch {
                    if (i % 2 == 0) send(i)
                }
            }
        }.toList()
    }

Finally, please read carefully the runBlocking documentation.

2 Likes

That looks good, thank you!

In regards to runBlocking, I am not using coroutines comprehensively, but only at some places like these to fan out and merge (or just launch) certain tasks that can run in parallel. So I often have this situation:

val result = someClass.doSomeComputationWith(inputSet)

…written in non-suspending code and just want particularly some paralellism within that function, i.e. there isn’t some runBlocking in the main function and the whole codebase is suspending but pretty much the other way round.

1 Like