Explicit dependencies for coroutines

I am currently refactoring some of my libraries, because it seems like current coroutines solve most of problems, I’ve tried to solve myself. The only question that remains are the dependencies. I have a task graph, where a task could depend on a single or multiple tasks. All tasks are started lazily. Cancellation of a dependency should lead to cancellation of the task, but not vise versa (the dependency could be used in another task). Currently, I just inherit a Deferred, and add explicit dependency list to it. Is there a way to solve this problem via structured concurrency?

1 Like

Hi @darksnake,
currently cancelling a dependency cause a CancellationException on await invocation, so if a task invoke dependency.await() an exception is thrown and the task may be cancelled (this depend by error handling).

Otherwise, if you want to cancel always a task as soon as possible, you can consider

    dependency.invokeOnCompletion { error -> if (error != null) task.cancel() }

I am supposing that each task and dependency live in a different coroutine scope.

I’ve written something like this for now:

    val deferred = async(context + CoroutineMonitor(), start = CoroutineStart.LAZY) {
        //if one of dependencies fails, it should drop current scope, but not vise versa
        dependencies.forEach {
            it.start()
        }
        supervisorScope{ block() }
    }

The ounter scope should be generated by async builder. If join fails. The block should contain suspending calls to depenecies, so if any of them fails, it will fail the block itself, but it is really hard to understand just from looking on it if will behave correctly (did not test it yet). Also creating a supervisor for each task seems to be rather expensive since I intend to run quite a lot of those tasks.

I agree.
Moreover I don’t know what scopes are created and how, however please consider something like this:

fun <T> CoroutineScope.asyncWithDependencies(
        context: CoroutineContext,
        dependencyJobs: List<Job> = emptyList(),
        block: suspend CoroutineScope.() -> T
): Deferred<T> = async(context) {
    dependencyJobs.forEach { job ->
        job.start()
        job.invokeOnCompletion { error ->
            if (error != null) cancel(CancellationException("Dependency $job failed with error: ${error.message}"))
        }
    }

    return@async block()
}
1 Like

Yeah, it looks better. I will try it and check if this completion handler will impact performance. I can always turn it off by flag.

And I probably do not need to have a separate class to hold dependencies. I can create a coroutine context key to hold dependencies for progress computation. Looks good.

We have done something similar in our project. However, don’t you need to await completion of dependencies before invoking the dependant’s block?

No, we do not. It contains awaits internally. The idea of preliminary start is that you do not start everything sequentially as it is encountered (thus negating parallelism advantages), but start everything and then allow it to be computed in parallel, while suspending on first await.

But I don’t see where this first await happens. Shouldn’t it be something like this:

dependencyJobs.forEach { job ->
    job.start()
    job.invokeOnCompletion { error ->
        if (error != null) cancel(CancellationException("Dependency $job failed with error: ${error.message}"))
    }
}
dependencyJobs.forEach { it.await() }

It happens inside the block. It is possible to declare dependency and not to use it.

Makes sense, I think. Thank you.

It does not work after all. Because the produced deferred is computed outside of scope it was produced in. So I need either use GloabalScope or produce a scope at use site somehow.