I’m using KMongo, which is wrapped around mongodb-driver-async. There is a Collection method
FindIterable<TDocument> find(Bson filter);
which returns Iterable with asynchronously delivered results. I’d like to iterate over it sequentially inside a coroutine. There is my solution
(inside suspend function)
val channel = Channel<Action>()
GlobalScope.launch {
collection.find(filter).forEach {
runBlocking { channel.send(it) }
}
channel.close()
}
for (action in channel) {
process(action) // do some processing
}
I have few concerns about it:
Is this the correct use of runBlocking? I basically need to pass suspend block when method expects just block
Is this the correct use of channels? Maybe there is a better solution, like some suspendCoroutine or so?
About GlobalScope - is there are some common rulez to decide which Scope should be used? Can it be inherited from suspend function it is placed?
EDIT
I have realized, that I oversimplified my example, so it contain almost no sense.
My goal was to create another find method, that returns a Channel, which can be iterated over later.
Piece of real code:
I’m having a similar problem: trying to wrap a callback-based subscription API using coroutines. More specifically, the initial API provides a subscribe method taking a callback as input, and calls that callback repeatedly whenever there is a new value coming up.
fun SomeApi.subscribe(destination: String, callback: (String) -> Unit)
I’d like to provide a similar method that takes no callback, but instead returns a ReceiveChannel:
class WrappedApi(val api : SomeApi) {
fun <T> CoroutineScope.subscribe(destination: String): ReceiveChannel<T> = produce {
api.subscribe(destination) {
launch { send(it) }
}
}
}
I don’t see how you can get around using a coroutine builder inside the callback, since the callback doesn’t accept suspend functions. And of course the suspendCoroutine option using the continuation doesn’t work here, since it needs to be called multiple times.
Oh, I didn’t know about this API. Is there any difference from using runBlocking { send(it) }?
That being said, I’m currently working on a multiplatform library and this function does not exist as a common function (because we can’t block JS’s unique thread, so it’s JVM-only).
I will probably have to apply the same runBlocking-equivalent as in unit tests (using GlobalScope.promise), or do you know any alternative?