How to use structured concurrency correctly with blocking IO code?

I try to use Kotlin for a P2P application and started with the basic structure for a network node with a server and a connection class for handling the sending and receiving of messages.

I need to use blocking IO as my use case it to use Tor and non-blocking IO does not support SocksProxy.

I tried to stick with the concepts of structured concurrency as far as I understand it (being a Kotlin beginner) but the only way I get my code running without getting blocked is by using CoroutineScope(Dispatchers.IO).launch {...}.

As far I understand my current version would require manual resource management, as if the parent scope terminates my standalone scopes would not get canceled.

Beside the usage in the blocking IO functions I also did not find a way how to use the existing scope for the channel receivers. Also here it only works by using a standalone scope.
I guess the problem here is that the sender at that channel is the blocking IO scope already running in a standalone scope…

Hope to deal with such a setup in the right way?
Any help/feedback very appreciated! Thanks in advance!!

My code is posted here:

https://play.kotlinlang.org/#

That’s quite a lot of code to digest. If I understand your main concern correctly, then it is not really related to blocking IO. You seem to try to design your code in the way that suspend functions don’t… suspend.

I believe it is (almost?) always a bad idea to start background tasks from suspend functions. Suspend functions are marked as “suspend” for a reason. They tell their callers: “I will wait for whatever is needed to be done”. Scheduling asynchronous tasks may be confusing to the caller.

So my suggestion is to let suspend functions suspend. Then, the caller could choose whether invoke some function synchronously (by invoking it directly) or asynchronously (with launch()/async()).

So:

  1. Remove all these CoroutineScope(Dispatchers.Default).launch { } and invoke their body directly - as a result, your functions will suspend.
  2. Make Server.listen(), Connection.startListen() also synchronous, so make them suspendable and remove launch() / replace with withContext().
  3. Whenever you need to start some kind of a long running service, but you don’t want to wait for it, start it inside launch().

For example, your Node.startServer() will become something like this:

suspend fun startServer(serverPort: Int) {
    coroutineScope {
        val socketHandler = Channel<Socket>()
        val serverSocket = socketFactory.createServerSocket(serverPort)
        server = Server(serverSocket, socketHandler, CoroutineScope(Dispatchers.IO))

        launch { server.listen() }
        
        socketHandler.consumeEach { socket ->
            println("Received ${socket} at handler")
            val connection = getConnectionAndStartListen(socket)
            inboundConnections.add(connection)
        }
            
    }
}

It starts two long-running operations: listen() and consumeEach() and waits for both of them. Then use startServer() like this:

launch { node1.startServer(1111) }

As I said above, this way the caller is in control of how to invoke a long running operation. The code is synchronous by default, but you can make it asynchronous where it is needed. In your original code you made almost everything asynchronous, because you were concerned about “blocking”. If you prefer asynchronous code then you don’t really need coroutines :slight_smile:

After such redesign you have a proper structured concurrency, because all coroutines are descendants of the coroutine started by runBlocking().

Also, note that blocking IO operations are not really cancellable. You need to make sure that you close all resources, because otherwise coroutines may freeze waiting in a blocking IO even after cancellation.

Great thanks. I managed to resolved most cases already similar to what you suggested. Only the connection handler I am still making troubles… I don’t want that the outside caller has to know about the internals of Node. So when a new connection gets created from an incoming connection I need to setup the async listeners but as that code is inside the socket listener its a bit more complicated as the other cases.

Damn my reply got deleted from the spam filter…

Hope this one will make it:

Thanks @broot for your explainations! Got more clear now.

Here is the fixed code:
https://play.kotlinlang.org/#