Cancel blocking function after timeout without 'select'

I am trying to execute a blocking function (accepting incoming TCP connections) from the main thread. For the sake of simplicity, let’s just say that this function returns after server.accept() in the following code returned 100 times:

fun blockingFunc(server: ServerSocket): Int {
    var numSockets = 0
    repeat(100) {
        runCatching {
            server.accept()
            numSockets++
        }
    }

    return numSockets
}

Executing the function like this will obviously block until all 100 clients have connected:

fun main() {
    val server = ServerSocket(2000)
    val clients = server.use(::blockingFunc)
    println("$clients connected clients.")
}

However, I would like to execute blockingFunc with a timeout in place. Since select and onTimeout are still experimental features, I am interested to achieve the same thing without using these features.

In principle, the following code seems to work:

fun main() {
    val server = ServerSocket(2000)
    val clients = runBlocking {
        val asyncRes = async(Dispatchers.IO) {
            server.use(::blockingFunc)
        }

        val timeoutJob = launch(Dispatchers.Default) {
            delay(1000)
            println("Timeout reached!")
            runCatching {
                server.close()
            }
        }

        asyncRes.await().also {
            timeoutJob.cancelAndJoin()
        }
    }

    println("$clients connected clients.")
}

Note that when server.close() is executed, pending server.accept() calls will throw an Exception.

I am wondering the following two thing, though:

  1. Is the code launched as timeoutJob guaranteed to be executed or could a long-running computation in one of the other threads lead to a situation where the timeout does not work? In the second case: Do I have to introduce a new, single thread dedicated to this code or is there a cheaper solution?
  2. Is there a more idiomatic or generally cleaner solution to the described problem?

I think it would be more flexible to not close the resource on timeout, but on cancellation. There may be more reasons to stop the blocking operation, not only timeout.

Utility could look something like:

suspend fun <T : Closeable, R> T.closeOnCancel(block: suspend (T) -> R): R = coroutineScope {
    try {
        async { block(this@closeOnCancel) }.await()
    } catch (e: CancellationException) {
        close()
        throw e
    }
}

Then we can use it like this:

withContext(Dispatchers.IO) {
    val server = ServerSocket(2000)

    val sock = withTimeout(5000) {
        server.closeOnCancel {
            server.accept()
        }
    }

    val bytes = withTimeout(5000) {
        sock.closeOnCancel {
            sock.getInputStream().readNBytes(4)
        }
    }

    println(bytes.contentToString())
}

This is just an example/POC, it may need some improvements.

Also, I think you should not be afraid of using experimental features of coroutines. There are a lot components that are marked as experimental, but in fact they are pretty much stable.

2 Likes

Thank you, this is in fact a nice solution. Looking behind the abstraction, I am wondering how many threads are involved here and if there might be a corner case that causes the code to fail.

I am going to write down my current understanding of the code and would appreciate if someone could confirm/object the following statements:

  1. When withContext is called, the primary thread (A) suspends and will be blocked until the println statement returns.
  2. A dedicated thread (B) from the Dispatchers.IO pool will execute all statements in the passed block sequentially.
  3. After creating the ServerSocket instance, this thread (B) obtains another thread (C) from the Dispatchers.IO pool to execute the block passed to withTimeout. It suspends until the timeout is reached or the block returns.
  4. Meanwhile, the new thread (C) calls async, which will run the block passed to it in another thread (D) from the Dispatchers.IO pool. Now, thread C will wait for await, while thread D waits for accept.
  5. If, for instance, a timeout is reached, the withTimeout call blocking thread B will cancel the await in thread C. This will cause the ServerSocket to be closed, thread C to become available again, and the accept call blocking thread D to return. Since there are no further statements to execute for thread D, this will also make thread D available again.
  6. Without waiting for thread D, the second thread (B) moves on to the second withTimeout call. Analogous to the to first case, two additional threads are needed. This might be C and D (given that thread D is already available again). If thread D is not yet available, but the pool does not offer any other thread, the async call will wait for thread D.

So am I correct to assume that this solution needs Dispatchers.IO to have three threads (B, C, and D) available, and that it is my responsibility to ensure this?

You understand the code correctly, but you need to understand that suspending in coroutines doesn’t block the thread. This is the main point of coroutines. You can run 1000 concurrent coroutines, each waiting for a second, you can do it using just one thread and it will take about 1s to finish:

repeat(1000) {
    launch { delay(1000) }
}

In our above case (back to IO) we only need 2 threads to run the code correctly. One thread will be blocked inside accept()/readNBytes() and another can do everything else. Also, the latter can still do other things that your application does using Dispatchers.IO. So this code occupies/reserves only one thread for itself due to blocking IO and requires at least one additional thread that can be re-used with other parts of the application. Also, IO threads are picked from the pool of threads, so it’s not expensive to acquire the IO thread.

Usually, you don’t have to care about threads when using coroutines. By default Dispatchers.IO creates 64 threads at max and that means your application shouldn’t invoke more than 63 simultaneous blocking operations, because then IO operations will get queued.