That’s quite a lot of code to digest. If I understand your main concern correctly, then it is not really related to blocking IO. You seem to try to design your code in the way that suspend functions don’t… suspend.
I believe it is (almost?) always a bad idea to start background tasks from suspend functions. Suspend functions are marked as “suspend” for a reason. They tell their callers: “I will wait for whatever is needed to be done”. Scheduling asynchronous tasks may be confusing to the caller.
So my suggestion is to let suspend functions suspend. Then, the caller could choose whether invoke some function synchronously (by invoking it directly) or asynchronously (with launch()
/async()
).
So:
- Remove all these
CoroutineScope(Dispatchers.Default).launch { }
and invoke their body directly - as a result, your functions will suspend.
- Make
Server.listen()
, Connection.startListen()
also synchronous, so make them suspendable and remove launch()
/ replace with withContext()
.
- Whenever you need to start some kind of a long running service, but you don’t want to wait for it, start it inside
launch()
.
For example, your Node.startServer()
will become something like this:
suspend fun startServer(serverPort: Int) {
coroutineScope {
val socketHandler = Channel<Socket>()
val serverSocket = socketFactory.createServerSocket(serverPort)
server = Server(serverSocket, socketHandler, CoroutineScope(Dispatchers.IO))
launch { server.listen() }
socketHandler.consumeEach { socket ->
println("Received ${socket} at handler")
val connection = getConnectionAndStartListen(socket)
inboundConnections.add(connection)
}
}
}
It starts two long-running operations: listen()
and consumeEach()
and waits for both of them. Then use startServer()
like this:
launch { node1.startServer(1111) }
As I said above, this way the caller is in control of how to invoke a long running operation. The code is synchronous by default, but you can make it asynchronous where it is needed. In your original code you made almost everything asynchronous, because you were concerned about “blocking”. If you prefer asynchronous code then you don’t really need coroutines
After such redesign you have a proper structured concurrency, because all coroutines are descendants of the coroutine started by runBlocking()
.
Also, note that blocking IO operations are not really cancellable. You need to make sure that you close all resources, because otherwise coroutines may freeze waiting in a blocking IO even after cancellation.