Hey, I wanted to try out how to do networking in kotlin. I tried to used code I got from a website, but it was java. When i’m using java it’s working but with kotlin it doesn’t, because the messages get combined. Any help or explanation why this is happening is appreciated
Output kotlin:
Connection Accepted...
Received message: I like non-blocking servers
Received message: Hello non-blocking world!One more message...exit
Output java:
Connection Accepted...
Reading...
Received message: I like non-blocking servers
Reading...
Received message: Hello non-blocking world!
Reading...
Received message: One more message..
Reading...
Received message: exit
Connection closed...
Could you post the code here please? (I see nothing at those links. Plus, external links can change or be taken down, while discussions here could continue to be useful to people for a long time.)
private var running = false
private var selector: Selector? = null
fun start() {
selector = Selector.open()
// We have to set connection host, port and non-blocking mode
// We have to set connection host, port and non-blocking mode
val socketChannel = ServerSocketChannel.open()
socketChannel.socket().bind(InetSocketAddress("localhost", 8089))
socketChannel.configureBlocking(false)
socketChannel.register(selector, SelectionKey.OP_ACCEPT)
val ops = socketChannel.validOps()
socketChannel.register(selector, ops, null)
while (true) {
selector!!.select(1000)
val selectedKeys = selector!!.selectedKeys()
val i = selectedKeys.iterator()
while (i.hasNext()) {
val key = i.next()
if (key.isAcceptable) {
// New client has been accepted
handleAccept(socketChannel, key)
} else if (key.isReadable) {
// We can run non-blocking operation READ on our client
handleRead(key)
}
i.remove()
}
}
}
private fun handleAccept(
mySocket: ServerSocketChannel,
key: SelectionKey
) {
println("Connection Accepted...")
// Accept the connection and set non-blocking mode
val client = mySocket.accept()
client.configureBlocking(false)
// Register that client is reading this channel
client.register(selector!!, SelectionKey.OP_READ)
}
private fun handleRead(key: SelectionKey) {
// create a ServerSocketChannel to read the request
val client = key.channel() as SocketChannel
// Create buffer to read data
val buffer = ByteBuffer.allocate(1024)
val read = client.read(buffer)
if (read > 0) {
// Parse data from buffer to String
val data = String(buffer.array()).trim { it <= ' ' }
if (data.isNotEmpty()) {
println("Received message: $data")
if (data.equals("exit", ignoreCase = true)) {
client.close()
println("Connection closed...")
}
}
}
}
Kotlin client:
fun main() {
val messages = arrayOf("I like non-blocking servers", "Hello non-blocking world!", "One more message...", "exit")
println("Starting client...")
val client = SocketChannel.open(InetSocketAddress("localhost", 8089))
for (msg in messages) {
println("Prepared message: $msg")
val buffer = ByteBuffer.allocate(1024)
buffer.put(msg.toByteArray())
buffer.flip()
val bytesWritten = client.write(buffer)
println("Sending Message: $msg with size $bytesWritten")
}
client.close()
println("Client connection closed")
}
Java server:
private static Selector selector = null;
public static void main(String[] args) {
try {
selector = Selector.open();
// We have to set connection host, port and non-blocking mode
ServerSocketChannel socket = ServerSocketChannel.open();
ServerSocket serverSocket = socket.socket();
serverSocket.bind(new InetSocketAddress("localhost", 8089));
socket.configureBlocking(false);
int ops = socket.validOps();
socket.register(selector, ops, null);
while (true) {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> i = selectedKeys.iterator();
while (i.hasNext()) {
SelectionKey key = i.next();
if (key.isAcceptable()) {
// New client has been accepted
handleAccept(socket, key);
} else if (key.isReadable()) {
// We can run non-blocking operation READ on our client
handleRead(key);
}
i.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
private static void handleAccept(ServerSocketChannel mySocket,
SelectionKey key) throws IOException {
System.out.println("Connection Accepted...");
// Accept the connection and set non-blocking mode
SocketChannel client = mySocket.accept();
client.configureBlocking(false);
// Register that client is reading this channel
client.register(selector, SelectionKey.OP_READ);
}
private static void handleRead(SelectionKey key)
throws IOException {
System.out.println("Reading...");
// create a ServerSocketChannel to read the request
SocketChannel client = (SocketChannel) key.channel();
// Create buffer to read data
ByteBuffer buffer = ByteBuffer.allocate(1024);
client.read(buffer);
// Parse data from buffer to String
String data = new String(buffer.array()).trim();
if (data.length() > 0) {
System.out.println("Received message: " + data);
if (data.equalsIgnoreCase("exit")) {
client.close();
System.out.println("Connection closed...");
}
}
}
I can’t explain the difference between Java and Kotlin, but I don’t see any code that would handle wrapping your data into some kind of message frames. Without this, you don’t have any guarantees on how the data will arrive. It may arrive exactly as you sent it, it may arrive as a one big message, it may arrive one char at a time, etc.
Thank you very much. I understand how it works now. But that makes me even wonder more why it worked using Java. The code I used in the previous comments worked with Kotlin, when every time I send a message I let the current thread sleep for even 0 millis.
I’ve used 2 buffers now, 1 for the length of the packet and 1 for the data itself. If anyone knows a more efficient way I did it, every comment is appreciated
My server now:
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.SelectionKey
import java.nio.channels.Selector
import java.nio.channels.ServerSocketChannel
import java.nio.channels.SocketChannel
data class Connection(val lengthBuffer: ByteBuffer, var buffer: ByteBuffer?)
fun main() {
val selector = Selector.open()
val serverSocketChannel = ServerSocketChannel.open()
serverSocketChannel.bind(InetSocketAddress("127.0.0.1", 1111))
serverSocketChannel.configureBlocking(false)
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT)
while (serverSocketChannel.isOpen) {
selector.select()
val selectedKeys = selector.selectedKeys()
val iterator = selectedKeys.iterator()
while (iterator.hasNext()) {
val selectionKey = iterator.next()
if (selectionKey.isAcceptable) {
val socketChannel = serverSocketChannel.accept() as SocketChannel
socketChannel.configureBlocking(false)
socketChannel.register(selector, SelectionKey.OP_READ, Connection(ByteBuffer.allocate(4), null))
println("New connection")
} else if (selectionKey.isReadable) {
val connection = selectionKey.attachment() as Connection
val lengthBuffer = connection.lengthBuffer
val channel = selectionKey.channel() as SocketChannel
if (lengthBuffer.hasRemaining()) {
channel.read(lengthBuffer)
if (lengthBuffer.hasRemaining()) {
break
}
} else {
if (connection.buffer == null) {
lengthBuffer.flip()
connection.buffer = ByteBuffer.allocate(lengthBuffer.int)
}
val buffer = connection.buffer
if (buffer != null) {
channel.read(buffer)
if (buffer.hasRemaining()) {
break
} else {
buffer.flip()
handleInput(channel, String(buffer.array()))
connection.buffer = null
lengthBuffer.clear()
}
}
}
}
iterator.remove()
}
}
}
fun handleInput(channel: SocketChannel, message: String) {
println("${channel.remoteAddress} -> $message")
if (message == "exit") {
channel.close()
}
}
My client(-s) now:
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.SocketChannel
fun main() {
for (i in 0..3) {
val socketChannel = SocketChannel.open(InetSocketAddress("127.0.0.1", 1111))
val messages = arrayOf("Hello world!", "What's up?", "exit")
for (message in messages) {
val lengthBuffer = ByteBuffer.allocate(4)
val byteArray = message.toByteArray()
lengthBuffer.putInt(byteArray.size)
println(byteArray.size)
lengthBuffer.flip()
socketChannel.write(lengthBuffer)
val buffer = ByteBuffer.wrap(byteArray)
socketChannel.write(buffer)
}
}
}