Kotlin coroutines and Java Completable Future integration

Usually I’m using standard kotlin-jdk8 library to jump from Java *future API world into the Kotlin’s suspend heaven.

And it worked great for me, until I encountered Neo4J cursor API, where I can’t do .await() on the completion stage, because it immediately starts fetching millions of records into memory.

Kotlin way does not work for me, like this:

suspend fun query() {
    driver.session().use { session ->

        val cursor: StatementResultCursor = session.readTransactionAsync {
            it.runAsync("query ...", params)
        }.await()

        var record = cursor.nextAsync().await()

        // CODE BELOW IS NOT REACHABLE, WE ALREADY DEAD WITH OOM
        while (record != null) {
            val node = record.get("node")
            mySuspendProcessingFunction(node)
            record = cursor.nextAsync().await()
        }
    }
}

At the same time Java API works good, we fetch records one by one:

suspend fun query() {
    session.readTransactionAsync { transaction ->
        transaction.runAsync("query ...", params).thenCompose { cursor ->
            cursor.forEachAsync { record ->
                runBlocking { // BUT I NEED TO DO RUN BLOCKING HERE :(
                    val node = record.get("node")
                    mySuspendProcessingFunction(node)
                }
            }
        }
    }.thenCompose {
        session.closeAsync()
    }.await()
}

The second option works for me, but it is pretty ugly - definitely not a Kotlin way, and what is more important, I need to use runBlocking (but these whole block is executed within suspend function)

What am I doing wrong? Is there a better way?

Hi @sqshq
You can experminet Kotlin flow using nextAsync.

Take a look here: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow.html

You can consider something like:

fun StatementResultCursor.asFlow() = flow {
  do {
    val record = nextAsync().await()
    if (record != null) emit(record)
  } while (record !=null)
}

Thank you @fvasco, I’ve tried that. Unfortunately result is the same, we still fail with OOM very fast.

Here is the code:

suspend fun query() {
    session.readTransactionAsync { transaction ->
        transaction.runAsync(query, params).thenApply { cursor ->
            cursor.asFlow().onEach { record ->
                val node = record.get("node")
                mySuspendProcessingFunction(node)
            }
        }
    }.thenCompose {
        session.closeAsync()
    }.await()
}

fun StatementResultCursor.asFlow() = flow {
    do {
        val record = nextAsync().await()
        if (record != null) emit(record)
    } while (record != null)
}

Am I missing something? Maybe I’m not using Flow correctly?

Hi @sqshq
can you modify your Java example to get the first result only using nextAsync?

I don’t know the Neo4J library, in your first example the while (record != null) is an infinite loop.

@sqshq you should use collect instead of onEach