Problem with coroutines in multi threaded application


#1

Hi,

I currently try to use coroutines to create a sequence of (mapped) database entries.
My code currently looks like this:

fun <T> streamOfResult(connectionSupplier: () -> Connection,
                       sql: String,
                       tableRowConverter: (ResultSet) -> T,
                       maxRows: Int = Integer.MAX_VALUE): Stream<T> {

    val sequence = buildSequence<T> {
        connectionSupplier().use {
            it.prepareStatement(sql).use { stmt ->
                if (maxRows < Integer.MAX_VALUE) {
                    stmt.maxRows = maxRows
                }
                StreamLogger.log.debug("${Thread.currentThread()}: executing query")
                val resultSet = stmt.executeQuery()
                while (resultSet.next()) {
                    StreamLogger.log.debug("${Thread.currentThread()}: yield result")
                    yield(tableRowConverter.invoke(resultSet))
                }
                StreamLogger.log.debug("${Thread.currentThread()}: closing resultset")
                resultSet.close()
            }
        }
    }
    return sequence.asStream()
}

In my application the resulting stream is fully consumed by collecting or counting the entries.

This function works great when executing it from a single thread (e.g. for 100 times).
But when calling the function from 100 threads in parallel, the log entry “closing resultset” is never written. The resultset, statement and database connections remain open.

Unfortunately I’m not able to use the “Exposed” framework in this context.

Any ideas what is wrong with my code?


#2

I found the problem.

In the processing chain of the stream another database connection is needed. The connection cannot be provided by the connection pool as it is limited to a size < nThreads/2.

Are there any other potential problems with this code?


#3

Obviously talking to myself here. :wink:

The problem with the code above is, that resultSet, statement and connection are not closed, when the stream is not consumed completely (e.g. by using Stream.findFirst())

Currently I have no solution for this problem and have migrated the code to using rx.Observable.

The “Exposed” library has solved the problem. A function like

private fun streamOfResult(
        query: () -> Query,
        deserialize: (ResultRow) -> Event): Stream<Event> {
    return StreamSupport.stream(transaction {
        query().map(deserialize).asIterable().spliterator()
    }, false)
}

works perfectly. Unfortunately I could not find out, how it is solved there.


#4

By the way: The similar Reactor framework has some Kotlin APIs and carries less historic baggage. The performance is more or less the same (varies slightly depending on the benchmark).