In koolapp-stream, how can a handler or fun decide when to close its cursor?

Hi, I learned about koolapp the other day, and found it really KOOL!

Here is a question I got when looking at Readme and Test code:

in SimpleStreamTest https://github.com/koolapp/koolapp/blob/master/koolapp-stream/src/test/kotlin/test/koolapp/stream/SimpleStreamTest.kt#L15

val stream = SimpleStream<String>()
val c1 = stream.open{ results += it }
stream.onNext("foo")
....
stream.onNext("bar")
c1.close()

Here the handler fun is {results += it}
cursor c1 is closed by its enclosing function.
Then I've come to think, can I controll when to close c1 IN the handler fun?
something like:

stream.open{ (item, cursor) -> {results += item; if (item == "bar") cursor.close()}

My question is, is this senario useful? Can it be implmented easily?

Thanks for your kind words :)

So a Handler can close the stream at any time by keeping hold of the Cursor its passed in the onOpen() method cal and then calling its close() methodl. See the little section on this page about the Stream contract.

This is how take(5) for example will consume 5 events from a stream and then close it down. Though if you open a stream with just a function which processes each T event, you don’t have access to the Cursor; so you need to use a Handler object or a helper function to do that part for you etc.

Another option is we could add an API where you process one of a number of sealed classes in a pattern match style; so that your function could process an Open, Next, Error or Done object. It would be easy to add this facade as a possible adapter to the Handler class. Though its a little cleaner if we can deal with closing of the stream using a combinator that can be combined; e.g. adding a take(n) or takeWhile() type method.

Thanks James for your clear explanation :) Now I understand what take and takeWhile is used for.

I didn’t get around my head for takeWhile().open(), maybe I still am not really familiar with functional style coding. I’ll try more code and do my homework :slight_smile:

So in the SimpleStream Test, using takeWhile() instead of c1.close()

  val c1 = stream.takeWhile{ it != "another" }.open{ results += it }

I looked into the code and found that stream.takeWhile will create a new TakeWhileStream, I suppose this new stream is just a view layer on the original stream instead of a copy, right?

How does this new stream enforce thread safety with the original stream?  Is there performance issue with multiple layers of Stream if composed many times? Do we have inline for this kind of abstraction?

Yes your understanding is correct; in this case the composed streams (or handlers) are just simple facades which are simple and lightweight. Its rather like composing functions on collections in kotlin - filter() and map() are just simple function calls:

people.filter{ it.country = "UK" }.map { it.name }

They are simple lightweight functions which are efficient and easy to compose. While 'Stream' may be used in some of the implementation class names, there are no heavy underlying operating system resources like threads, sockets or anything.

From a threading perspective, the contract of a stream is that it will only invoke a specific Handler object from one thread at once; but the thread which is used is up to the Stream to decide. For the facade streams which are created using the combinator methods like take(), these just act as a method call in a pipeline; so they are a synchronous function call within the call to the Handler by the top most Stream. e.g. if you created a Stream using a java.util.Timer, then all the function calls would be inside that timer thread and all the functions would complete within the timer streams calls to its Handler.

Thanks for clarifying my questions. Now I know I can safely compose stream facades without worries :) Thanks, and indeed this design is kool !