Coroutines design advice

I am quite familiar with parallel programming using threads, and I have enough experience to be able to write robust threaded code.

However, in my current project I would like to get more experience with Kotlin coroutines, and I think I have found a good place to experiment with them. Before committing on a design, I’d like input from people with more Kotlin coroutine experience than me as to what would be the best approach.

Simplified to its simplest form, the use case is as follows:

Assume a large array (millions of elements) and a function that perform an operation on a value and return the result.

fun getResult(index: Int): Value

When computing the result, the implementation of getResult will recursively compute the result of some number of other indexes. In the most extreme case, reading the result of one index will implicitly compute the result of every single other value.

It is impossible to know beforehand which values will be retrieved, and in what order. However, in most cases all or almost all) values will be computed.

Obviously the best way to handle this is to create an array of all the results, populate it will nulls, and then simply fill in the results as they are computed, returning previous results if they are available.

Now that I’ve explained the scenario, the question is how I would parallelise this computation, specifically with coroutines in mind. I know how I would do it in pure Java will just threads low-level concurrency primitives, but I can’t really get that design to fit with how you are supposed to do things with coroutines in Kotlin.

For this kind of memoization, I use a pattern involving:

  1. An AtomicReferenceArray<Deferred<Int>> to store the memos; and
  2. A CoroutineContext for launching async tasks, with a lifetime at least as long as that array.

getResult would be a suspending function. In the implementation, you would:

  1. See if there’s already a Deferred for the right value. If so, return await()
  2. If not, CAS in a new CompletableDeferred. If the CAS fails, go back to 1
  3. launch a coroutine in the above-mentioned scope that computes the value using whatever recursive call to getResult you like, and completes the CompletableDeferred
  4. return await()

Using a Deferred as the memoized value is the trick to easily ensuring that each value is computed by exactly one coroutine.

Note that when two threads try to CAS in a result at the same time, one of them will have allocated an unnecessary CompletableDeferred. As an optimization, you could recycle these to save future allocations.

Hi, @Loke,
as @mtimmerm said, you need a CoroutineContext for computation, and getResult should be a suspend function.

Therefore, starting from a requests array you can create a results list:

val results : List<Deferred<Value>> = requests.mapIndexed { index, data -> 
  async(start = LAZY) { 
    getResult(index)
  }
}

// optional: start computation eager
results.forEach(Deferred<Value>::start)

// Somewhere in the code:
val result3 = results[3].await()
println(result3)

I choosed mapIndexed, but you can consider some other function to fit better your actual design.

@mtimmerm I consider CompletableDeferred+launch harder to handle.

Yes, if you know beforehand that you’re going to need most or all of the values, then this is reasonable. I assume (since it doesn’t seem to be documented) that parent CoroutineContext can complete without requiring the unused lazy coroutines to start at all?

No, it does not.
It is an open issue Unstarted lazy job prevents parent from completion · Issue #1545 · Kotlin/kotlinx.coroutines · GitHub, I proposed 1065 (see issue).

Scope or child coroutines have to be cancelled:

/* scope */cancel()
// or
results.forEach(Deferred<Value>::cancel)

Thank you (and other people who have answered). This is really interesting and I will start to play around with this. This should give me enough to get started.

I do have two questions though:

Firstly, the method referred to as getResult in my simplified example is in the actual project a very central method part of an interface that is used all over the project. Making it suspending resulted in a very large fraction of functions in the entire project suspending as well. My question is: Is there any drawback to marking 50% of methods in a project suspending. Is there a performance impact of suspending functions?

My second question is about the methods and classes mentioned, await, Deferred, etc. Are there methods not available in multiplatform projects?

Hi Loke,
what kind of method is getResult?
It performs some synchronous/asynchronous I/O or it is a pure CPU bound?

suspend fun has some drawbacks, it define more classes/methods (Android is limited) and requires more object allocations (only if suspension happens).

If you plan to switch to async, then coroutine can help you project, otherwise, if you consider coroutine API helpfull for this specific only, you can use runBlocking as a bridge.

This is a multiplatform project. Currently I’m targeting the JVM and Linux, with Javascript being a possibility at some point, once reflection is supported on Javascript.

I don’t have any current plans to make this run on Android, although it’s definitely not an impossibility, although I personally don’t feel like doing that.

My current problem is that coroutines on multiplatform seems to be very limited. As far as I can tell, not even await is available. This means that I will have to build some infrastructure around this using expect in some way.

Finally, the getResult method (or, as it is actually called in the project valueAt and can be found here: array/types.kt at master · lokedhs/array · GitHub) is currently never blocking.

Indeed there is. A suspending function allocates a continuation in object in most cases when it calls another suspending function. If the function doesn’t actually block and you don’t mind it holding onto its thread while it’s executing, then there’s no reason to force all the external callers to call it from a coroutine. You could wrap the internal suspend function with an external one that uses runBlocking.