Kotlin nio socket

Hey, I wanted to try out how to do networking in kotlin. I tried to used code I got from a website, but it was java. When i’m using java it’s working but with kotlin it doesn’t, because the messages get combined. Any help or explanation why this is happening is appreciated

Output kotlin:

Connection Accepted...
Received message: I like non-blocking servers
Received message: Hello non-blocking world!One more message...exit

Output java:

Connection Accepted...
Reading...
Received message: I like non-blocking servers
Reading...
Received message: Hello non-blocking world!
Reading...
Received message: One more message..
Reading...
Received message: exit
Connection closed...

Here are my classes
Kotlin server
Kotlin client

Java server
Java client

Could you post the code here please? (I see nothing at those links. Plus, external links can change or be taken down, while discussions here could continue to be useful to people for a long time.)

Kotlin server:

private var running = false
    private var selector: Selector? = null

    fun start() {
        selector = Selector.open()
//            We have to set connection host, port and non-blocking mode
        //            We have to set connection host, port and non-blocking mode
        val socketChannel = ServerSocketChannel.open()
        socketChannel.socket().bind(InetSocketAddress("localhost", 8089))
        socketChannel.configureBlocking(false)
        socketChannel.register(selector, SelectionKey.OP_ACCEPT)
        val ops = socketChannel.validOps()
        socketChannel.register(selector, ops, null)
        while (true) {
            selector!!.select(1000)
            val selectedKeys = selector!!.selectedKeys()
            val i = selectedKeys.iterator()
            while (i.hasNext()) {
                val key = i.next()
                if (key.isAcceptable) {
//                        New client has been accepted
                    handleAccept(socketChannel, key)
                } else if (key.isReadable) {
//                        We can run non-blocking operation READ on our client
                    handleRead(key)
                }
                i.remove()
            }
        }
    }

    private fun handleAccept(
        mySocket: ServerSocketChannel,
        key: SelectionKey
    ) {
        println("Connection Accepted...")

        // Accept the connection and set non-blocking mode
        val client = mySocket.accept()
        client.configureBlocking(false)

        // Register that client is reading this channel
        client.register(selector!!, SelectionKey.OP_READ)
    }

    private fun handleRead(key: SelectionKey) {
        // create a ServerSocketChannel to read the request
        val client = key.channel() as SocketChannel

        // Create buffer to read data
        val buffer = ByteBuffer.allocate(1024)
        val read = client.read(buffer)

        if (read > 0) {
            //        Parse data from buffer to String
            val data = String(buffer.array()).trim { it <= ' ' }
            if (data.isNotEmpty()) {
                println("Received message: $data")
                if (data.equals("exit", ignoreCase = true)) {
                    client.close()
                    println("Connection closed...")
                }
            }
        }
    }

Kotlin client:

fun main() {
    val messages = arrayOf("I like non-blocking servers", "Hello non-blocking world!", "One more message...", "exit")
    println("Starting client...")
    val client = SocketChannel.open(InetSocketAddress("localhost", 8089))

    for (msg in messages) {
        println("Prepared message: $msg")
        val buffer = ByteBuffer.allocate(1024)
        buffer.put(msg.toByteArray())
        buffer.flip()
        val bytesWritten = client.write(buffer)
        println("Sending Message: $msg with size $bytesWritten")
    }

    client.close()
    println("Client connection closed")
}

Java server:

private static Selector selector = null;

    public static void main(String[] args) {

        try {
            selector = Selector.open();
//            We have to set connection host, port and non-blocking mode
            ServerSocketChannel socket = ServerSocketChannel.open();
            ServerSocket serverSocket = socket.socket();
            serverSocket.bind(new InetSocketAddress("localhost", 8089));
            socket.configureBlocking(false);
            int ops = socket.validOps();
            socket.register(selector, ops, null);
            while (true) {
                selector.select();
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> i = selectedKeys.iterator();

                while (i.hasNext()) {
                    SelectionKey key = i.next();

                    if (key.isAcceptable()) {
//                        New client has been accepted
                        handleAccept(socket, key);
                    } else if (key.isReadable()) {
//                        We can run non-blocking operation READ on our client
                        handleRead(key);
                    }
                    i.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static void handleAccept(ServerSocketChannel mySocket,
                                     SelectionKey key) throws IOException {

        System.out.println("Connection Accepted...");

        // Accept the connection and set non-blocking mode
        SocketChannel client = mySocket.accept();
        client.configureBlocking(false);

        // Register that client is reading this channel
        client.register(selector, SelectionKey.OP_READ);
    }

    private static void handleRead(SelectionKey key)
            throws IOException {
        System.out.println("Reading...");
        // create a ServerSocketChannel to read the request
        SocketChannel client = (SocketChannel) key.channel();

        // Create buffer to read data
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        client.read(buffer);
//        Parse data from buffer to String
        String data = new String(buffer.array()).trim();
        if (data.length() > 0) {
            System.out.println("Received message: " + data);
            if (data.equalsIgnoreCase("exit")) {
                client.close();
                System.out.println("Connection closed...");
            }
        }
    }

Java client:

public static void main(String[] args) {
        try {
            String[] messages = {"I like non-blocking servers", "Hello non-blocking world!", "One more message..", "exit"};
            System.out.println("Starting client...");
            SocketChannel client = SocketChannel.open(new InetSocketAddress("localhost", 8089));

            for (String msg : messages) {
                System.out.println("Prepared message: " + msg);
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                buffer.put(msg.getBytes());
                buffer.flip();
                int bytesWritten = client.write(buffer);
                System.out.println(String.format("Sending Message: %s\nbufforBytes: %d", msg, bytesWritten));
            }

            client.close();
            System.out.println("Client connection closed");

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

I can’t explain the difference between Java and Kotlin, but I don’t see any code that would handle wrapping your data into some kind of message frames. Without this, you don’t have any guarantees on how the data will arrive. It may arrive exactly as you sent it, it may arrive as a one big message, it may arrive one char at a time, etc.

Thank you very much. I understand how it works now. But that makes me even wonder more why it worked using Java. The code I used in the previous comments worked with Kotlin, when every time I send a message I let the current thread sleep for even 0 millis.

I’ve used 2 buffers now, 1 for the length of the packet and 1 for the data itself. If anyone knows a more efficient way I did it, every comment is appreciated

My server now:

import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.SelectionKey
import java.nio.channels.Selector
import java.nio.channels.ServerSocketChannel
import java.nio.channels.SocketChannel

data class Connection(val lengthBuffer: ByteBuffer, var buffer: ByteBuffer?)

fun main() {
    val selector = Selector.open()
    val serverSocketChannel = ServerSocketChannel.open()
    serverSocketChannel.bind(InetSocketAddress("127.0.0.1", 1111))
    serverSocketChannel.configureBlocking(false)
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT)

    while (serverSocketChannel.isOpen) {
        selector.select()
        val selectedKeys = selector.selectedKeys()
        val iterator = selectedKeys.iterator()

        while (iterator.hasNext()) {
            val selectionKey = iterator.next()

            if (selectionKey.isAcceptable) {
                val socketChannel = serverSocketChannel.accept() as SocketChannel
                socketChannel.configureBlocking(false)
                socketChannel.register(selector, SelectionKey.OP_READ, Connection(ByteBuffer.allocate(4), null))

                println("New connection")
            } else if (selectionKey.isReadable) {
                val connection = selectionKey.attachment() as Connection
                val lengthBuffer = connection.lengthBuffer
                val channel = selectionKey.channel() as SocketChannel

                if (lengthBuffer.hasRemaining()) {
                    channel.read(lengthBuffer)

                    if (lengthBuffer.hasRemaining()) {
                        break
                    }
                } else {
                    if (connection.buffer == null) {
                        lengthBuffer.flip()
                        connection.buffer = ByteBuffer.allocate(lengthBuffer.int)
                    }

                    val buffer = connection.buffer

                    if (buffer != null) {
                        channel.read(buffer)

                        if (buffer.hasRemaining()) {
                            break
                        } else {
                            buffer.flip()
                            handleInput(channel, String(buffer.array()))
                            connection.buffer = null
                            lengthBuffer.clear()
                        }
                    }
                }
            }
            iterator.remove()
        }
    }
}

fun handleInput(channel: SocketChannel, message: String) {
    println("${channel.remoteAddress} -> $message")
    if (message == "exit") {
        channel.close()
    }
}

My client(-s) now:

import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.SocketChannel

fun main() {
    for (i in 0..3) {
        val socketChannel = SocketChannel.open(InetSocketAddress("127.0.0.1", 1111))
        val messages = arrayOf("Hello world!", "What's up?", "exit")

        for (message in messages) {
            val lengthBuffer = ByteBuffer.allocate(4)
            val byteArray = message.toByteArray()
            lengthBuffer.putInt(byteArray.size)
            println(byteArray.size)
            lengthBuffer.flip()
            socketChannel.write(lengthBuffer)
            val buffer = ByteBuffer.wrap(byteArray)
            socketChannel.write(buffer)
        }
    }
}

You should be able to use only one buffer, using putInt() and getint()