I need to perform some processing in a multi-stage pipeline. At least one stage of the processing involves blocking operations, specifically JDBC calls.
Unless I’m missing something, the
kotlinx.coroutines API isn’t very helpful when there is blocking code around. It’s easy enough to turn a non-blocking suspendable function into a blocking one with
runBlocking, but unless I’m missing something the opposite can’t be done, even if coroutines are given their own threads. Hence, even though channels and
launch are nicer to use, I find myself resorting to
BlockingQueue and an
This brings me to Kotlin’s handling of
BlockingQueue. It seems that trying to use
forEach or a
for loop on a
BlockingQueue results in the non-useful behavior of looping over all the items in the queue at the moment when the
for is executed, rather than repeatedly using
.take() to loop over all the data sent to the queue. So I end up having to wrap the data in
Optional, write a
do loop, check for empty values to indicate end of data, and so on.
So, what’s the cleanest approach here? Is there some technique for using coroutines but still being able to deal with chunks of blocking code? Is there a neat way to loop over everything sent to a queue, performing blocking operations on each element, and then exit?