I want to split a sequence in chunks, but not depending on the number of elements, but on a certain criterion. Think of a sorted list of names, which should be splitted into sub lists depending on the first character.
After some research how to write an extension for sequences I came to the following solution:
fun <T> Sequence<T>.chunked(predicate: (T, T) -> Boolean): Sequence<List<T>> {
val underlyingSequence = this
if (!underlyingSequence.iterator().hasNext())
return emptySequence()
return sequence {
val buffer = mutableListOf<T>()
var last: T? = null
underlyingSequence.forEach { current ->
val shouldSplit = last?.let { predicate(it, current) } ?: false
last = current
if (shouldSplit) {
yield(buffer.toList())
buffer.clear()
}
buffer.add(current)
}
yield(buffer)
}
}
sequenceOf("Adelheid", "Anna", "Annika", "Berthold", "Bertram", "Borat", "Carl")
.chunked { last, current -> last[0] != current[0] }
.forEach { println(it) }
// prints:
// [Adelheid, Anna, Annika]
// [Berthold, Bertram, Borat]
// [Carl]
Do you see anything that could be improved?
Since I doubt that I’m the only one with such a requirement, I could imagine that such a method would be a good addition to the standard library. What do you think?
I’ve thought about groupBy, but I don’t think it would be a good solution for a large collection, since it needs to hold all elements from the sequence in memory. The whole purpose of sequences in my case is to avoid holding all elements in memory, because the server would crash with an OutOfMemoryError then.
The code above works in my artificial test but not in practice. I get the following exception:
java.lang.IllegalStateException: stream has already been operated upon or closed
The problem is the hasNext check, since that already starts the iteration and then the foreach iterates a second time, but the concrete sequence returned from a JPA query can be iterated only once.
The improved version:
fun <T> Sequence<T>.chunked(predicate: (T, T) -> Boolean): Sequence<List<T>> {
val underlyingSequence = this
return sequence {
val buffer = mutableListOf<T>()
var last: T? = null
for (current in underlyingSequence) {
val shouldSplit = last?.let { predicate(it, current) } ?: false
if (shouldSplit) {
yield(buffer.toList())
buffer.clear()
}
buffer.add(current)
last = current
}
if (buffer.isNotEmpty()) {
yield(buffer)
}
}
}
The last if suppresses yielding an empty list. If the underlying sequence was empty it should stay empty and chunked shouldn’t create an artificial and superfluous empty list.
What concerns me in the proposed signature is that it covers only that particular use case when you can determine in the predicate whether or not to start a new chunk based on the current and previous elements.
@ilya.gorbunov Yes, my use case might be to specific for a general purpose function in the standard library. What do you think of a signature like the following?
The predicate would receive a window that would have a maximum of elements given by windowSize, and the resulting chunks would have a maximum size of chunkMaxSize where -1 (or maybe null) would mean “unlimited”.
@ilya.gorbunov Reactor provides the method bufferUntilChanged for this use case. That would be even more restricted than my first version, but seems to be good enough for many use cases.
Reactor is, by the way, a great source of inspiration for methods that would be useful for Kotlin Flow.
I’ve actually thought of filing an issue for each operator of Flux, or at least for each operator name (to not become overwhelmed by all signature variants). What do you think?
Flow seems to be a great tool in principle. Remembering a benchmark it should be significantly faster than Reactor or RxJava, but it has very few operators at the moment.
My idea to file an issue for (more or less) every Flux operator is against the goals of the Flow developers. Flow should stay small and should only have very relevant operators.
Maybe it is better to create a separate operator lib, like the creator or RxJava did: