Using threads and blocking queues in Kotlin

I need to perform some processing in a multi-stage pipeline. At least one stage of the processing involves blocking operations, specifically JDBC calls.

Unless I’m missing something, the kotlinx.coroutines API isn’t very helpful when there is blocking code around. It’s easy enough to turn a non-blocking suspendable function into a blocking one with runBlocking, but unless I’m missing something the opposite can’t be done, even if coroutines are given their own threads. Hence, even though channels and launch are nicer to use, I find myself resorting to BlockingQueue and an Executor.

This brings me to Kotlin’s handling of BlockingQueue. It seems that trying to use forEach or a for loop on a BlockingQueue results in the non-useful behavior of looping over all the items in the queue at the moment when the for is executed, rather than repeatedly using .take() to loop over all the data sent to the queue. So I end up having to wrap the data in Optional, write a do loop, check for empty values to indicate end of data, and so on.

So, what’s the cleanest approach here? Is there some technique for using coroutines but still being able to deal with chunks of blocking code? Is there a neat way to loop over everything sent to a queue, performing blocking operations on each element, and then exit?

Hi @meta,
there are some errors about coroutine in your post, but this is not the point.

If you need to write a small application, a simple filter, and your are confident with Java Standard Library, I suggest you to using a regular BlockingQueue and so on…
You can implement all the software using non blocking API, of course, but the performance difference can be negligible.

So avoid any kind of “holy war” and have a fun.

Yes, usually is is used Dispatcher.IO or a custom one.

Consider something like this example

 class Consumer implements Runnable {
   private final BlockingQueue queue;
   Consumer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while (true) { consume(queue.take()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   void consume(Object x) { ... }
 }

My only trivial suggestion is to used an explicit sealed class instead of Optional

sealed class Message {
  data class Item(val value:MyValue) : Message()
  object End : Message()
}
1 Like

That Consumer example is pretty much what I’ve done.

I’ve taken another look at Dispatcher.IO, and tried writing quick toy examples using both coroutines and threads for blocking pipelines. Can you take a look and make sure I’ve understood how to use coroutines correctly?

Thanks.

Hi @meta,
it is ok!

Take a look here:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

const val queueSize = 2

fun CoroutineScope.source(): ReceiveChannel<Int> = produce(capacity = queueSize) {
    println("Source starting")
    for (i in 1..10) {
        channel.send(i)
        println("Source iteration sent $i")
        val sleep: Long = (400..600).random().toLong()
        withContext(Dispatchers.IO) {
            Thread.sleep(sleep)
        }
    }
    // channel.close()
    println("Source exiting")
}

fun CoroutineScope.filter(numbers: ReceiveChannel<Int>): ReceiveChannel<String> = produce(capacity = queueSize) {
    println("  Filter starting")
    for (x in numbers) {
        println("  Filter received $x")
        val sleep: Long = (400..600).random().toLong()
        withContext(Dispatchers.IO) {
            Thread.sleep(sleep)
        }
        val y = "'$x'"
        channel.send(y)
        println("  Filter sent $y")
    }
    // channel.close()
    println("  Filter exiting")
}

fun CoroutineScope.output(): SendChannel<String> = actor(capacity = queueSize) {
    println("    Output starting")
    for (x in channel) {
        println("    Output received $x")
        val sleep: Long = (400..600).random().toLong()
        withContext(Dispatchers.IO) {
            Thread.sleep(sleep)
        }
    }
    println("    Output exiting")
}

suspend fun runAll() {
    println("runAll starting")
    coroutineScope {
        val source = source()
        val filter = filter(source)
        val output = output()

        filter.consumeEach { output.send(it) }
        output.close()
    }
    println("runAll exiting")
}

suspend fun main() {
    println("main starting")
    runAll()
    println("main exiting")
}