Idiomatic suspend network async functions

Hello what is the idiomatic way of writing this code?

suspend fun getUsers(): CompletableFuture<ArrayList<UserEntity>> {
    var users =  arrayListOf<UserEntity>()
    val future = CompletableFuture<ArrayList<UserEntity>>()
    pgQuerier.execQuery("""
        select *
        from "Users"
      """.trimIndent()) { qr, conn ->
      try {
        if (qr.succeeded()) {
          val rows = qr.result()
          rows.forEach { row ->
            val user = getNewUser(row)
            if (user != null) {
              users.add(user)
            }
          }
        } else {
          println("Failed to get users: ${qr.cause().message}")
        }
      }
      catch(ex: java.lang.Exception) {
        println(ex.message)
      } finally {
        conn.close()
        future.complete(users)
      }
    }

    return future
  }

 CoroutineScope(Dispatchers.IO).launch {
      println("start users 1")
      val users1 = getUsers().get()
      println("end users 1 $users1")
      println("start users 2")
      val users2 = getUsers().get()
      println("end users 2 $users2")
    }

I have few marginal considerations… I hope not to go out of topic.
Why don’t you make it more generic? Instead of getUsers something like executeQuery?

suspend fun <T> executeQuery(query: String, extractRowFn: (Any) -> T)

*“Any” above can be replaced by the datatype you expect for your row

Now you can potentially declare every query in the same way by doing:

val getUser = executeQuery("""
        select *
        from "Users"
      """.trimIndent(), ::getNewUser)

getUser.get()...

Unfortunately you can’t return a Result object from your function due to a Kotlin restriction.
So it would be nice to have a sealed class instead because your output can be either a query result (ArrayList) or a reject (exception?), in both cases you can easily handle pattern matching. Printing a string on error doesn’t really mean anything for the caller (I guess you added them as example but it worth saying it).
You might not need that forEach, you can simply map the qr.result() with extractRowFn function above.
Should this function care about closing connection? I don’t know your specific case nor the libraries you are using to make those operations but normally we might want to group those calls in a block and then close the connection. For this reason it might worth putting pgQuerier as parameter (with a default value?), after all it’s a sort of service you can potentially mock for testing purposes.

I appreciate your help. But as for now what I’m looking for is an idiomatic async network call in kotlin using suspend and coroutines.

The suspendCoroutine method is how you convert callback-based async code into suspend methods.

suspend fun PgQuerierClass.awaitExecQuery(query: String): Pair<QueryClass, ConnectionClass> {
    return suspendCoroutine { continuation ->
        execQuery(query) { qr, conn ->
            continuation.resume(qr to conn) //resume gives the value for suspendCoroutine to return
        }
    }
}

Then you can use the new helper method to write the code without any callback (also incorporated mapNotNull and use):

suspend fun getUsers(): List<UserEntity> {
    val qr, conn = pgQuerier.awaitExecQuery("""
        select *
        from "Users"
      """.trimIndent())
      return conn.use {
        try {
          if (qr.succeeded()) {
            qr.result().mapNotNull { getNewUser(row) }
          } else {
            println("Failed to get users: ${qr.cause().message}")
            emptyList()
          }
        }
        catch(ex: java.lang.Exception) {
          println(ex.message)
          emptyList()
        }
      }
    }
  }

If the method is used more generally at all, I’d suggest pulling the conn.use into a second helper so it doesn’t need to be dealt with.

(Note: didn’t run the code so please forgive any typos)

Beautiful just what I needed. Now here’s the thing I’m using graphql-java and vertx, which both are java frameworks. When I get out of coroutine land I need to emit some types that the frameworks expect. Would this be a reasonable way of doing this? I guess you could say the CompletableFuture works somewhat similarly to the suspendCoroutine you showed me?

fun get(environment: DataFetchingEnvironment?): CompletableFuture<UserEntity?> {
  val future = CompletableFuture<UserEntity?>()
  CoroutineScope(Dispatchers.IO).launch {
    future.complete(EntityData.getUser(
      environment?.getArgument<Long>("id")
      ?: 0))
  }
  return future
}

If you are working with libraries that use CompletableFuture, check out this integration library: Module kotlinx-coroutines-jdk8. It takes care of that boilerplate for you.

Thanks for the reminder. I totally forgot about that.

Hi, @jsoneaday,
the idiomatic way of

suspend fun getUsers(): CompletableFuture<ArrayList<UserEntity>>

can be

fun getUsers(): Flow<UserEntity>

Or you can consider some reactive libraries, but you should avoid suspend fun xxx(): CompletableFuture<...> or return a Result, Exception is idiomatic in Kotlin.

1 Like

I like your idea of using Flow. However how beneficial would that be if I will only ever return a single object (even if it is a collection of things). I thought Flow is for a hot connections that can continuously return multiple things, like a stream.

If you need a list of result, the nickallendev’s anserw is what you need.

Flow is an asynchronous Sequence, so you can map a ResultSet to a Sequence or a Flow, ie:

suspend fun Connection.execQuerySuspend(sql: String): Query {
    return suspendCoroutine { continuation ->
        execQuery(sql) { query, _ ->
            continuation.resume(query)
        }
    }
}

suspend fun <T, R> Connection.execQuerySequence(query: String, mapper: ResultSet.() -> T, block: (Sequence<T>) -> R): R {
    val query = execQuerySuspend(query)
    if (!query.succeeded()) throw query.cause()
    return query.result().use { rs ->
        val resultSequence = sequence<T> { while (rs.next()) yield(rs.mapper()) }
        block(resultSequence)
    }
}

suspend fun getUsers(): List<UserEntity> =
        pgQuerier.execQuerySequence("""
        select *
        from "Users"
      """.trimIndent(), ResultSet::toUserEntity) { it.toList() }