I need to perform some processing in a multi-stage pipeline. At least one stage of the processing involves blocking operations, specifically JDBC calls.
Unless I’m missing something, the kotlinx.coroutines
API isn’t very helpful when there is blocking code around. It’s easy enough to turn a non-blocking suspendable function into a blocking one with runBlocking
, but unless I’m missing something the opposite can’t be done, even if coroutines are given their own threads. Hence, even though channels and launch
are nicer to use, I find myself resorting to BlockingQueue
and an Executor
.
This brings me to Kotlin’s handling of BlockingQueue
. It seems that trying to use forEach
or a for
loop on a BlockingQueue
results in the non-useful behavior of looping over all the items in the queue at the moment when the for
is executed, rather than repeatedly using .take()
to loop over all the data sent to the queue. So I end up having to wrap the data in Optional
, write a do
loop, check for empty values to indicate end of data, and so on.
So, what’s the cleanest approach here? Is there some technique for using coroutines but still being able to deal with chunks of blocking code? Is there a neat way to loop over everything sent to a queue, performing blocking operations on each element, and then exit?