Hi,
I currently try to use coroutines to create a sequence of (mapped) database entries.
My code currently looks like this:
fun <T> streamOfResult(connectionSupplier: () -> Connection,
sql: String,
tableRowConverter: (ResultSet) -> T,
maxRows: Int = Integer.MAX_VALUE): Stream<T> {
val sequence = buildSequence<T> {
connectionSupplier().use {
it.prepareStatement(sql).use { stmt ->
if (maxRows < Integer.MAX_VALUE) {
stmt.maxRows = maxRows
}
StreamLogger.log.debug("${Thread.currentThread()}: executing query")
val resultSet = stmt.executeQuery()
while (resultSet.next()) {
StreamLogger.log.debug("${Thread.currentThread()}: yield result")
yield(tableRowConverter.invoke(resultSet))
}
StreamLogger.log.debug("${Thread.currentThread()}: closing resultset")
resultSet.close()
}
}
}
return sequence.asStream()
}
In my application the resulting stream is fully consumed by collecting or counting the entries.
This function works great when executing it from a single thread (e.g. for 100 times).
But when calling the function from 100 threads in parallel, the log entry “closing resultset” is never written. The resultset, statement and database connections remain open.
Unfortunately I’m not able to use the “Exposed” framework in this context.
Any ideas what is wrong with my code?