Usually I’m using standard kotlin-jdk8 library to jump from Java *future
API world into the Kotlin’s suspend
heaven.
And it worked great for me, until I encountered Neo4J cursor API, where I can’t do .await()
on the completion stage, because it immediately starts fetching millions of records into memory.
Kotlin way does not work for me, like this:
suspend fun query() {
driver.session().use { session ->
val cursor: StatementResultCursor = session.readTransactionAsync {
it.runAsync("query ...", params)
}.await()
var record = cursor.nextAsync().await()
// CODE BELOW IS NOT REACHABLE, WE ALREADY DEAD WITH OOM
while (record != null) {
val node = record.get("node")
mySuspendProcessingFunction(node)
record = cursor.nextAsync().await()
}
}
}
At the same time Java API works good, we fetch records one by one:
suspend fun query() {
session.readTransactionAsync { transaction ->
transaction.runAsync("query ...", params).thenCompose { cursor ->
cursor.forEachAsync { record ->
runBlocking { // BUT I NEED TO DO RUN BLOCKING HERE :(
val node = record.get("node")
mySuspendProcessingFunction(node)
}
}
}
}.thenCompose {
session.closeAsync()
}.await()
}
The second option works for me, but it is pretty ugly - definitely not a Kotlin way, and what is more important, I need to use runBlocking (but these whole block is executed within suspend function)
What am I doing wrong? Is there a better way?