Coroutines to implement a networking protocol

I’m new to Kotlin, though very proficient in C++17. Kotlin coroutines seem very appealing to me to implement networking protocols. However, I could use some help in understanding coroutines for my use case.

Let’s look at an HTTP client for example. Typically, handling the initial headers requires some sort of per-connection state machine. First state when the client receives the response: Client is receiving the HTTP response line. Next state: Client is receiving the HTTP response headers. This state continues until an empty line is received etc.

Typically, I’d implement this as a set of states and when statements. As soon as I’m done parsing incoming data, I take a state dependent action. If for example I received the newline character of the HTTP response line, I can parse said response line and switch to the next state etc.

However, with coroutines, I can implement this entire sequence in one suspending function. Any time I run out of data, I could suspend the function somehow until I get more data.

The question is how to suspend the function. I need to suspend whenever more data is needed. In addition, it would be useful to communicate to the outside world what custom HTTP headers were received to give outside code the chance to evaluate them and decide whether or not they are valid.

In classical code, I’d use callbacks for fetching more data and for letting user code validate custom HTTP headers. In here, I’d like to somehow inform the outside world that I need more data or have HTTP headers to validate, and suspend the function until that operation was performed.

What would the proper approach be? I could fiddle around with continuations manually, though I guess this is discouraged until I really need to touch continuations manually. So, what else? A Sequence perhaps? Or channels? The latter work with only one data type though, right?

First of all, I can advice to take a look on ktor and kotlinx-io. Both do network operations.

However, with coroutines, I can implement this entire sequence in one suspending function. Any time I run out of data, I could suspend the function somehow until I get more data.

I can show you this with String Builder:

suspend fun fillString() {
    val result = StringBuilder()
  
    while(channel.hasLines) { // can be suspended
        result.appendln(channel.nextLine()) // can be suspended, until data is received
    }

    return result.toString();
}

So you can do the same with bytes, it is just loop.

The question is how to suspend the function. I need to suspend whenever more data is needed. In addition, it would be useful to communicate to the outside world what custom HTTP headers were received to give outside code the chance to evaluate them and decide whether or not they are valid.

This is a bit more complex, because you need some kind of back-pressure. Of course you can ask optional lambda to validate the input, however complex design requires more code.

I can advice you to take a look on Kotlin Flow. So you can create flow, which sends you byte buffers. Next you can filter them or flow this flow, if you found that the response part isn’t correct.

And also I’d like to repeat initial phrase - please take a look on ktor and kotlinx-io

kotlinx-io currently does not do network operations, but it could be hooked into standard network byte stream.

1 Like

Okay, my first attempt to use coroutines for something like this. This is a custom pairing protocol. Initially, a pairing request is sent to the peer (a device with an LCD on it). The peer responds to that. Then, the peer is instructed to generate and show a 10-digit PIN on its LCD. The user then has to read that PIN and enter it in some sort of user interface so that the coroutine gets that same PIN. Then, the coroutine requests some initial info. That info is encoded using the PIN (that’s why it is needed).

suspend fun performPairing(
    pairingPINCallback: () -> ByteArray,
    dataChannel: Channel<List<Byte>>
) {
    var packetData: List<Byte>
    var packet: Packet

    // Initiate pairing by sending a pairing request packet.
    // The functions that create packets are all called like this: create<packet type>Packet()
    packet = createRequestPairingPacket()
    // toByteList() is a custom extension function to convert Packet to List<Byte>
    dataChannel.send(packet.toByteList())

    // Wait for a response.
    packetData = dataChannel.receive()
    // toPacket() is a custom extension function to convert List<Byte> to Packet
    packet = packetData.toPacket()

    // Check if this is the response we expected; if not,
    // throw an exception, since we cannot continue here.
    if (packet.id != PacketID.PAIRING_PACKET)
        throw IOException("Incorrect packet")

    // Next, the peer shall create a 10-digit PIN and
    // show it on its display. The user has to enter that
    // PIN so that it can be passed over here.
    packet = createShowPINToUserPacket()
    dataChannel.send(packet.toByteList())

    var pin: ByteArray
    while (true) {
        pin = pairingPINCallback.invoke()
        if (pin.size != VALID_PAIRING_PIN_SIZE) {
            // TODO: Re-request the PIN from the user,
            // through some type of UI, then try again
        } else
        break
    }

    // We got the PIN. Now request the initial info
    // from the peer.
    packet = createRequestInitialInfoPacket()
    dataChannel.send(packet.toByteList())

    // Wait for the response and verify it.
    packetData = dataChannel.receive()
    packet = packetData.toPacket()
    if (packet.id != PacketID.INITIAL_INFO_PACKET)
        throw IOException("Incorrect packet")

    // Decode the response (it is encoded with the PIN).
    decodeInitialInfo(packet, pin)

    // Continue processing here
}

Sending and receiving works well with channels. But:

  1. I am currently not sure what flows would gain me in this case.
  2. See the TODO when entering the PIN. I have no idea how to do this. Furthermore, entering the PIN currently makes use of a callback. This is because the outside world has to somehow be notified that a PIN is now needed, plus it needs a way to pass that entered PIN to the coroutine. And, if the PIN is invalid, somehow the coroutine has to be able to re-request that the user enters the PIN. This means that somehow, the rest of the program has to be notified, so that it brings up the PIN dialog again etc. Note that I am not saying that I’d need some specific UI code here. Rather, I’d like to know how I’d suspend the coroutine here properly, and how I’d pass the new PIN to the coroutine.

Any ideas?

I am wondering if this would perhaps be a good candidate for the actor model. Any thoughts?