Iterate over asynchronous callbacks in coroutine

Hello,

I’m using KMongo, which is wrapped around mongodb-driver-async. There is a Collection method

FindIterable<TDocument> find(Bson filter);

which returns Iterable with asynchronously delivered results. I’d like to iterate over it sequentially inside a coroutine. There is my solution

(inside suspend function)

val channel = Channel<Action>()
GlobalScope.launch { 
    collection.find(filter).forEach {
        runBlocking { channel.send(it) }
    }
    channel.close()
}

for (action in channel) {
    process(action) // do some processing 
}

I have few concerns about it:

  1. Is this the correct use of runBlocking? I basically need to pass suspend block when method expects just block
  2. Is this the correct use of channels? Maybe there is a better solution, like some suspendCoroutine or so?
  3. About GlobalScope - is there are some common rulez to decide which Scope should be used? Can it be inherited from suspend function it is placed?

EDIT

I have realized, that I oversimplified my example, so it contain almost no sense.
My goal was to create another find method, that returns a Channel, which can be iterated over later.
Piece of real code:

override suspend fun find(filter: String): ReceiveChannel<Action> {
    val channel = Channel<Action>()
    GlobalScope.launch {
        collection.find(filter).forEach {
            runBlocking { channel.send(it) }
        }
        channel.close()
    }
    return channel
}

Is this the correct use of runBlocking ? I basically need to pass suspend block when method expects just block

No blocking code inside a coroutine, if possible. Using an async client it have to be possible.

Is this the correct use of channels? Maybe there is a better solution, like some suspendCoroutine or so?

You have to search for an async forEach method.

About GlobalScope - is there are some common rulez to decide which Scope should be used?

You should not use start a new coroutine for a forEach, however take a look here: http://kotlinlang.org/docs/reference/coroutines/basics.html#structured-concurrency

Can it be inherited from suspend function it is placed?

Use override suspend fun CoroutineScope.find(filter: String): ReceiveChannel<Action>, but not in this use case :slight_smile:

Thank you for your answer!

No blocking code inside a coroutine, if possible.

Yep, I know this rule. However, are there any potential problems in exact

runBlocking { channel.send(it) }

Because what I want here is run suspend lambda.

You have to search for an async forEach method.

I checked, I actually use async version of forEach:

suspend fun <T> FindIterable<T>.forEach(block: (T) -> Unit) = singleResult<Void> { forEach(Block { item -> block(item) }, it) }

Where singleResult is typical callback->coroutine wrapper. However, its limitation is it still requires non-suspend block as parameter.

You can use the batchCursor to implement an asynchronous forEach

http://api.mongodb.com/java/3.0/com/mongodb/async/client/MongoIterable.html#batchCursor-com.mongodb.async.SingleResultCallback-

I’m having a similar problem: trying to wrap a callback-based subscription API using coroutines. More specifically, the initial API provides a subscribe method taking a callback as input, and calls that callback repeatedly whenever there is a new value coming up.

fun SomeApi.subscribe(destination: String, callback: (String) -> Unit)

I’d like to provide a similar method that takes no callback, but instead returns a ReceiveChannel:

class WrappedApi(val api : SomeApi) {

    fun <T> CoroutineScope.subscribe(destination: String): ReceiveChannel<T> = produce {
        api.subscribe(destination) {
            launch { send(it) }
        }
    }
}

I don’t see how you can get around using a coroutine builder inside the callback, since the callback doesn’t accept suspend functions. And of course the suspendCoroutine option using the continuation doesn’t work here, since it needs to be called multiple times.

Is there a better approach?

Use sendBlocking instead

Oh, I didn’t know about this API. Is there any difference from using runBlocking { send(it) }?

That being said, I’m currently working on a multiplatform library and this function does not exist as a common function (because we can’t block JS’s unique thread, so it’s JVM-only).

I will probably have to apply the same runBlocking-equivalent as in unit tests (using GlobalScope.promise), or do you know any alternative?

@joffrey.bion
If your idea does not work as expected, my last suggestion is to use offer on an unlimited channel, this turns off back pressure at all.