Syntax for fluent co-routines

Hi,

This is my 3rd day into Kotlin and I’m so far loving the language.
I’ve been reading the coroutine documentation but can’t find an idiomatic answer for what I’m trying to do.

I’d like to chain co-routines together in a fluent style – the co-routines have enough state that they should be wrapped in a class.

Something like:

   CoRoutineClass1("some param")
   .pipeTo(CoRoutineClass2("another param"))
   .pipeTo(CoRoutineClass3("another param"))
   .start()

It seems like all the building blocks are there, but I’m having a hard time putting them together in this fashion.
Any suggestions?

I took a stab at how to implement this using extension functions. Many of the classes in the co-routine library are marked internal, making this a little more difficult. Unfortunately, this implementation doesn’t work as the channels get clogged up, as the consumers aren’t active yet. I don’t have a good solution for this yet.

Any suggestions on how this could be done without a new feature request?

class TransformerScope(val inChannel : ReceiveChannel<Int>, val outChannel : SendChannel<Int>)
class ConsumerScope(val inChannel : ReceiveChannel<Int>)
fun ReceiveChannel<Int>.transformWith (block: suspend TransformerScope.() -> Unit ) : ReceiveChannel<Int> {
    val scope = TransformerScope(this, Channel<Int>())
    return produce<Int> {
        block(scope)
    }
}
fun ReceiveChannel<Int>.consumeWith (block: suspend ConsumerScope.() -> Unit ) : Deferred<Unit> {
    val scope = ConsumerScope(this)
    return async {
        block(scope)
    }
}

using these extension functions:

fun main(args: Array<String>) = runBlocking<Unit> {

    val pipeline = produce<Int> {
        println("Producer1 is working in thread ${Thread.currentThread().name}")
        for (i in 1..10) {
            println("Sending " + i.toString())
            send(i)
            delay(300L)
        }
    }.transformWith() {
        println("Transformer is working in thread ${Thread.currentThread().name}")
        inChannel.consumeEach {
            println("Transforming " + it.toString())
            outChannel.send(it * 2)
        }
    }.consumeWith {
                inChannel.consumeEach {
                    println("Consuming " + it.toString())
                }
    }.await()
}

produces the result:

Producer1 is working in thread ForkJoinPool.commonPool-worker-1
Sending 1
Transformer is working in thread ForkJoinPool.commonPool-worker-2
Transforming 1
Sending 2

/// Hangs here
Process finished with exit code 1

Of course, I answered my own question 5 minutes after I asked.

The transformer wasn’t using the channel that’s created by the produce function in the scope, thus the messages weren’t going anywhere. This change fixed it:

fun ReceiveChannel<Int>.transformWith (block: suspend TransformerScope.() -> Unit ) : ReceiveChannel<Int> {
    val inChannel = this
    return produce<Int> {
        block(TransformerScope(inChannel, channel))
    }

It seems to me that all the corresponding “fluent” operators are already provided by kotlinx.coroutines library for the most part (see map, filter and other extensions on ReceiveChannel) so that I can rewrite your example in the following way without having to introduce any kind of new operators:

import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.*

fun main(args: Array<String>) = runBlocking<Unit> {
    val pipeline = produce<Int> {
        println("Producer1 is working in thread ${Thread.currentThread().name}")
        for (i in 1..10) {
            println("Sending " + i.toString())
            send(i)
            delay(300L)
        }
    }.map {
        println("Transforming " + it.toString())
        it * 2
    }.consumeEach {
        println("Consuming " + it.toString())
    }
}

The only missing piece seems to be ability to say “Transformer is working…” in the beginning of map operation which is related to a general desire to support free-form transformations. Maybe we needed some kind of transform operator, too: Free-form transform operation on ReceiveChannel · Issue #257 · Kotlin/kotlinx.coroutines · GitHub