Iterate over asynchronous callbacks in coroutine


#1

Hello,

I’m using KMongo, which is wrapped around mongodb-driver-async. There is a Collection method

FindIterable<TDocument> find(Bson filter);

which returns Iterable with asynchronously delivered results. I’d like to iterate over it sequentially inside a coroutine. There is my solution

(inside suspend function)

val channel = Channel<Action>()
GlobalScope.launch { 
    collection.find(filter).forEach {
        runBlocking { channel.send(it) }
    }
    channel.close()
}

for (action in channel) {
    process(action) // do some processing 
}

I have few concerns about it:

  1. Is this the correct use of runBlocking? I basically need to pass suspend block when method expects just block
  2. Is this the correct use of channels? Maybe there is a better solution, like some suspendCoroutine or so?
  3. About GlobalScope - is there are some common rulez to decide which Scope should be used? Can it be inherited from suspend function it is placed?

EDIT

I have realized, that I oversimplified my example, so it contain almost no sense.
My goal was to create another find method, that returns a Channel, which can be iterated over later.
Piece of real code:

override suspend fun find(filter: String): ReceiveChannel<Action> {
    val channel = Channel<Action>()
    GlobalScope.launch {
        collection.find(filter).forEach {
            runBlocking { channel.send(it) }
        }
        channel.close()
    }
    return channel
}

#2

Is this the correct use of runBlocking ? I basically need to pass suspend block when method expects just block

No blocking code inside a coroutine, if possible. Using an async client it have to be possible.

Is this the correct use of channels? Maybe there is a better solution, like some suspendCoroutine or so?

You have to search for an async forEach method.

About GlobalScope - is there are some common rulez to decide which Scope should be used?

You should not use start a new coroutine for a forEach, however take a look here: http://kotlinlang.org/docs/reference/coroutines/basics.html#structured-concurrency

Can it be inherited from suspend function it is placed?

Use override suspend fun CoroutineScope.find(filter: String): ReceiveChannel<Action>, but not in this use case :slight_smile:


#3

Thank you for your answer!

No blocking code inside a coroutine, if possible.

Yep, I know this rule. However, are there any potential problems in exact

runBlocking { channel.send(it) }

Because what I want here is run suspend lambda.

You have to search for an async forEach method.

I checked, I actually use async version of forEach:

suspend fun <T> FindIterable<T>.forEach(block: (T) -> Unit) = singleResult<Void> { forEach(Block { item -> block(item) }, it) }

Where singleResult is typical callback->coroutine wrapper. However, its limitation is it still requires non-suspend block as parameter.


#4

You can use the batchCursor to implement an asynchronous forEach

http://api.mongodb.com/java/3.0/com/mongodb/async/client/MongoIterable.html#batchCursor-com.mongodb.async.SingleResultCallback-