How can we create an infinite, writable and readable stream in Kotlin?


#1

Hello everyone,

today, I am learning how to work with stream in Kotlin. A question that arose during my learning is that: Is there a way for us to create an empty stream, that is infinite, writable and readable? For example, I want to create something like: stream = EmptyInfiniteStream()

I want this stream to be writable and readable. In other words, in one thread, I can write something to it: stream.write(data), and in another thread, I can read from it like: val a: T = stream.next(). If at the moment, there is nothing, it should wait forever until there is something. I wonder if such thing is possible in Kotlin?

The use case is: I have an utility class that input a stream, but my Consumer class that does not (it implements a method like ConsumeEvent(event) and the data producer will call it when he wants to send some data), so I want to link the output of this method to a stream for my utility class to work. In other words, I want to implement the ConsumeEvent method like this:

override fun Consume(event: Event) {stream.write(event)}

One workaround I think of is to use socket programming via an intermediate port, but that is bad. It would be great if Kotlin has some libraries that can do such thing without. Do you know anyway to achieve it?

Thank you so much.


#2

JVM does not provide the functionality out of the box (if you do not count queues), but you can use Channels from coroutines library. They are great!


#3

I don’t know if there’s a Kotlin version of it or not, but I’ve done this in Kotlin using Java’s PipedInputStream and PipedOutputStream like so:

val writer = PipedOutputStream()
val reader = PipedInputStream(writer)

// Write to the stream
writer.write(...)

// Read from the stream
reader.read() // Or reader.readLine(), etc. This will block until there is something to be read