[coroutines] Channel data transformation (similar to Rx). onClose callback?



We just started using coroutines in our Android app (switching from Rx and want to completely remove it). We have the following situation:

Our database is listening for changes and we would like to use that. So our repositories look something like this:

fun getTasks(): Channel<List<Task>> {
    val c = Channel<List<Task>>()
    val query = ...
    query.onChange { changes ->
        launch {
    return c

Our UseCases want to transform that data somehow (e.g. split that data based on some criteria):

fun execute(): Channel<Schedule> {
    val nc = Channel<Schedule>()
    launch {
        getTasks().consumeEach { tasks ->
            nc.send(Schedule(tasks, tasks))
    return nc

All we’re doing is transforming the data. Is there better/easier way to do this, without creating new channel(s)?

My second (two-part) question concerns closing channels. Can we close all channels along the created chain with one call to close? Can receive a callback that the Channel in our repository is closed so we can do this:

c.onClose {