Avoid extra Continuation parameter in suspending function

Avoid extra Continuation parameter in suspending function.

I’m participating in vertx-lang-kotlin project to enhance Kotlin coroutine’s integration (Basic coroutine support by fvasco · Pull Request #14 · vert-x3/vertx-lang-kotlin · GitHub).

The main Vert.x issue is to require an extra argument (Handler) for each asyncronous function (see pull request for detail).

Kotlin has the same issue and this might become an issue for interoperability (with other languages or framework): suspending function doesn’t return a state of computation and add an extra (unexpected?) parameter in signature.

Unfortunately I’m not really expert so choose the right path and understand the motivations deeply is hard for me.

For the function

suspend fun await() : Result

the JVM bytecode currently is

fun await(continuation : Continuation<Result>) : Object

but it may be better as

fun await() : Continuable<Result>
...
future.await().continue(myContinuation) // invocation example

where Continuable contain a fully, manageable state of computation?

What you suggesting is to return a future (async-style).

It is indeed better for interoperability with other JVM languages (including Java) but it is not better for Kotlin itself (if you design for Kotlin usage).

You can get some explanation as to why it is so for Kotlin in the design docs: coroutines-examples/kotlin-coroutines-informal.md at master · Kotlin/coroutines-examples · GitHub
I also gave a different take on that at the recent talk (no video yet, but will be soon – there is more there than shown on the slides) Introduction to Kotlin coroutines

However, for the purpose of interoperability you can always choose to expose a future. What kind of future that is going to be is completely up to you. For example, if you like JDK8 CompletableFuture, then you can add kotlinx-coroutines-jdk8 (see here) as a dependency (it adds just a few extra Kbs) and change the function:

suspend fun await(): Result {
  ... 
}

to

fun awaitAsync(): CompletableFuture<Result> = future {
  ...
}

You can even have both:

suspend fun await(): Result { ... } // your code is here, use it from Kotlin
fun awaitAsync() = future { await() } // use it from Java

The choice is yours.

Hi

but it is not better for Kotlin itself (if you design for Kotlin usage)

perhaps I explained it bad, sorry.

My proposal isn’t to modify Kotlin syntax, but bytecode generation, so return a Continuable instead to add an extra parameters (continue invocation is syntetized by compiler); in such case we gain better interoperability without modify nothing in Kotlin style.

You can get some explanation as to why it is so for Kotlin in the design docs

I read the document, probabily the affected parts are:

Observe, that async-style function composition is more verbose and error prone. If you omit .await() invocation in async-style example…

Compiler will garantee coroutine’s resume invoking the appropriate methods.

Compare how these styles scale for a big project using many libraries…

Kotlin standard library is another one in the party, using a new proprietary Continuation doesn’t avoid any extra class definition.

Compare their performance…

This is covered in my Vert.x proposal, but I can suppose that may be really a different context.
Definitely you can evalutate the problem better than me.

Suspending functions look strange from any language that does not support continuation-passing-style natively.

Kotlin supports CPS but I think that invoking a non Kotlin suspending function (eg: Scala function) looks likewise strange (please fix me if I get it wrong).
IMHO the problem is inside the function’s binary signature.

he async and suspending styles can be easily converted into one another using the primitives that we’ve already seen.
So, you can write suspending function in Kotlin just once, and then adapt it for interop with any style of promise/future with one line of code using an appropriate future{} coroutine builder function.

This is the Vert.x problem: write an extra line each time is really hassle; why don’t demand this to compiler?

However, for the purpose of interoperability you can always choose to expose a future.

Nice, but Kotlin interoperates really well with Java, so think twice before propose the bad way as default.
Why don’t return a Future as default and passing the Continuation to it?

You can even have both

Yes, I can, but I don’t consider pleasant have both.

I suppose that a main issue is a performance problem, it is a Vert.x team big fear (apart breaking change, but Kotlin has experimental library : )
Maybe a suspending function can produce a state machin implementing both Continuation and Continuable interfaces.

Futures are heavier than continuations in terms of implementation. If we change code generation to return a future, then performance when using suspending functions from Kotlin will suffer. The only plan we have in our sleeve to address your concerns is to follow the lead of other places where we needed to have JVM interoperability and solved it via different kinds of @JvmXXX annotation.

What we can do, is to add some kind of @JvmAsync annotation that forces compiler to generate an additional method for better Java interop that has the same name, does not have an extra parameter, and returns a future. What kind of future that should be is an open question. Maybe it should be somehow configuration. Discussion is welcome.

1 Like

Thanks for clarification.

At first glance this looks as an acceptable compromise.

As opposite, should be nice to address also Vert.x use case: having a nice way to call asynchronous function as a suspending function, but this look too invasive and out of scope for Kotlin language.

TL;DR make it pluggable

There are some consideration to do.

@JvmAsync affects annotated method
@JvmAsync affects all suspending methods in a class
@JvmAsync affects all suspending methods in an interface
I want to apply @JvmAsync to all public suspending method in the project if build target isn’t Android
and so on…

All above consideration are valids

On other side the effective Future implementation choice may become highly questionable, use CompletableFuture in Java 8 may be accettable but is unapplicable in Android.
Moreover a similar annotation @JsAsync should be considered.
Finally provide for free a cross-language compatibility for coroutine (like @ScalaCoroutine) can sound good.

Probably the best choice is no choice at all: realize a pluggable system to implement easly all of those. (kapt?)
You can provide some plugins for JVM CompletableFuture and JS Promise.

Extra consideration: @JvmOverloads may became another plug-in.

To avoid methods pollution and to throw away this interoperability problem from Kotlin compiler we can made a second build step to produce a separate artifact.

As example, my artifact `coreLib’ containing class:

class CoreDaoFactory(val endpoint : String) {

    suspend fun newUserDao() : UserDao

    private fun connect() = TODO()

}

can produce in a second build step a separate artifact containing a delegate method for each public method.

class CoreDaoFactory(val endpoint:String) {

    suspend fun newUserDao() : UserDao

}

class CoreDaoFactoryAsync {

    val endpoint : String ...

    val delegated : CoreDaoFactory ...

    constructor(endpoint : String) ...

    constructor(delegated : CoreDaoFactory) ...

    fun newUserDao() : Future<UserDaoAsync> = delegate{ newUserDao() }

    private fun <T> delegate(block : CoreDaoFactory.() -> T) : T = TODO()

}

Generated class doesn’t affect the Kotlin compilation step and the generated bytecode and it is really Java friendly.

Two-artifacts solution just looks too complicated to me. All in all, async programming is user-friendly if and only if your language supports it. We can have somewhat better interop with Java for async programming via @JvmAsync, but overthinking it is counter-productive. You should totally ditch Java and go full Kotlin with coroutines if you want easy-to-use async programming.

Another option is a @JvmSync annotation, that is method invocation acts like a runBlocking function.

The advantage of this solution is an acceptable default for the coroutine context, a reasonable stack-trace (almost until the suspending function invocation) and an easy to use interface (multi-threading way).
It would be valid:

@JvmSync fun main(vararg args : String) ...

Unfortunately an acceptable default isn’t always good and, probably, each invocation leads to a measurable overhead.

I don’t consider this a nice solution, but I wrote it only for completeness.

So I decided to switch my point of view: Java use Executor for multi-threading programming, so why don’t use a `Launcher’ for CSP?

Similar to Executor, a third part library may define:

public interface Launcher {
    <T> CompletableFuture<T> launch(Consumer<Continuation<T>> consumer);
}

and, on Java side:

Launcher launcher = ...some builder invocation...
Channel<String> channel = ...

Future<String> future = launcher.launch(continuation -> channel.receive(continuation));
System.out.println("Result " + future.get());

On the Java-interop annotation for runBlocking@JvmBlocking would be the good choice of name for the corresponding annotation. Also, we have support for suspend fun main on our plate via compiler plugin that will basically implicitly mark it as @JvmBlocking.

On the launcher note. We don’t need to introduce any new concepts or functions. Existing coroutine builders are designed to work from Java:

Try this:

import kotlin.coroutines.experimental.EmptyCoroutineContext;
import kotlinx.coroutines.experimental.BuildersKt;
import kotlinx.coroutines.experimental.CommonPool;
import kotlinx.coroutines.experimental.CoroutineStart;
import kotlinx.coroutines.experimental.channels.Channel;
import kotlinx.coroutines.experimental.channels.RendezvousChannel;

public class LaunchTest {
    public static void main(String[] args) throws InterruptedException {
        Channel<String> channel = new RendezvousChannel<>();
        // sender
        BuildersKt.launch(CommonPool.INSTANCE, CoroutineStart.DEFAULT, (scope, continuation) ->
            channel.send("Hello, coroutines world!", continuation));
        // receiver
        String result = BuildersKt.runBlocking(EmptyCoroutineContext.INSTANCE, (scope, continuation) ->
            channel.receive(continuation));
        // print result
        System.out.println(result);
    }
}

The only thing we might want to do, is to apply some @JvmOverloads and @JvmName for a nicer-looking Java API.

I looked a bit into this solution

String result = BuildersKt.runBlocking(EmptyCoroutineContext.INSTANCE, (scope, continuation) ->
    channel.receive(continuation));

it looks nice, at first glance the scope variable looks unuseful.
An overload with EmptyCoroutineContext.INSTANCE as default should be good.

BuildersKt.launch is a bit more tricky, Job and Deferred should work on JVM an JS, and this is nice, really.
On other side both have suspending function, so I cannot call directly await function.
FutureKt.future includes others use case.

Finally Continuation isn’t a Functional Interface, so we can consider to deploy something like

interface FunctionalContinuation <T> {
    void resume( value : T, throwable : Throwable )
}

This is really more comfortable: it can be a lambda and returns nothing (instead of
Unit.INSTANCE).
This interface can’t be written in Kotlin, so the Java version can’t be transpiled to JS.

We require an extra builder

BuilterKt.continuation(functionalContinuation : FunctionalInterface)

So writing suspension callback directly -to avoid extra allocation- is verbose.

@JvmOverloads will solve the issue of defaults. You cannot directly use Job.await from Java, but you can use it via runBlocking or via future (both are available from Java and would be quite nice to use with proper defaults).

CoroutineScope is indeed kind of a hack there, even when you use it from Kotlin. We’ll try to get rid of it before coroutines design is finalised.

With respect to Continuation we a tentatively thinking about replacing two resume methods with one resume method in the final design, but that is driven by completely different issues that then issue of making it a functional interface. Why would you want Continuation to be a functional interface anyway? What advantage does it give? Can you elaborate?

I don’t think that the current Continuation interface is wrong, this is out of the scope of this considerations.

I, a Java programmer, want to consume all items in a Channel, so I need to write a Consumer for consumeEach method, Java doesn’t have any CSP concept so I need simply to write a function to elaborate all items.
Unfortunately the consumer interface isn’t Consumer but Continuation, so I must instantiate an anonymous inner class, this doesn’t make me happy.

In my point of view -Java programmer- a Channel is quite similar to a Reactive Flowable, however Channel has a ugly API because I can’t manage asynchronous method using a lamba.

Without further consideration about what is the right interface for Continuation, the Continuation interface looks externally like a Result Handler in asynchronous programming, using callback-style can lead to callback-hell, writing two methods for each callback looks really, really unaccetable.

This consideration are valids only invoking suspending methods in any languages except Kotlin 1.1.

You can use consumeEach from Java. Given this kotlin definition in PTest.kt:

import kotlinx.coroutines.experimental.CommonPool
import kotlinx.coroutines.experimental.channels.produce

fun nums() = produce(CommonPool) {
    repeat(5) { i ->
        send(i)
    }
}

Try running the following Java code:

import kotlin.Unit;
import kotlin.coroutines.experimental.EmptyCoroutineContext;
import kotlinx.coroutines.experimental.BuildersKt;
import kotlinx.coroutines.experimental.channels.ChannelsKt;
import kotlinx.coroutines.experimental.channels.ProducerJob;

public class PConsume {
    public static void main(String[] args) throws InterruptedException {
        ProducerJob<Integer> nums = PTestKt.nums();
        BuildersKt.runBlocking(EmptyCoroutineContext.INSTANCE, (scope, cont) ->
            ChannelsKt.consumeEach(nums, (i, cont2) -> {
                System.out.println("Received " + i);
                return Unit.INSTANCE;
            }, cont)
        );
    }
}

However, it is not really convenient, as you can see, not it does make sense to improve it, since there is RxJava that was specifically designed to help writing async code for languages without built-in coroutines support, so if you need to consume a Channel from Java I’d recommend to convert it to some reactive stream first and the consume it. Here is an example:

import io.reactivex.Observable;
import kotlinx.coroutines.experimental.CommonPool;
import kotlinx.coroutines.experimental.channels.ProducerJob;
import kotlinx.coroutines.experimental.rx2.RxConvertKt;

public class PConsumeRx {
    public static void main(String[] args)  {
        ProducerJob<Integer> nums = PTestKt.nums();
        Observable<Integer> observable = RxConvertKt.asObservable(nums, CommonPool.INSTANCE);
        observable.subscribe(i -> {
            System.out.println("Received " + i);
        });
    }
}
1 Like