Splitting sequence in chunks based on predicate

I want to split a sequence in chunks, but not depending on the number of elements, but on a certain criterion. Think of a sorted list of names, which should be splitted into sub lists depending on the first character.

After some research how to write an extension for sequences I came to the following solution:

fun <T> Sequence<T>.chunked(predicate: (T, T) -> Boolean): Sequence<List<T>> {
    val underlyingSequence = this
    if (!underlyingSequence.iterator().hasNext())
        return emptySequence()
    return sequence {
        val buffer = mutableListOf<T>()
        var last: T? = null
        underlyingSequence.forEach { current ->
            val shouldSplit = last?.let { predicate(it, current) } ?: false
            last = current
            if (shouldSplit) {
                yield(buffer.toList())
                buffer.clear()
            }
            buffer.add(current)
        }
        yield(buffer)
    }
}


sequenceOf("Adelheid", "Anna", "Annika", "Berthold", "Bertram", "Borat", "Carl")
    .chunked { last, current -> last[0] != current[0] }
    .forEach { println(it) }
// prints:
// [Adelheid, Anna, Annika]
// [Berthold, Bertram, Borat]
// [Carl]

Do you see anything that could be improved?

Since I doubt that I’m the only one with such a requirement, I could imagine that such a method would be a good addition to the standard library. What do you think?

1 Like

Maybe try groupBy with a map afterwards?

1 Like

I’ve thought about groupBy, but I don’t think it would be a good solution for a large collection, since it needs to hold all elements from the sequence in memory. The whole purpose of sequences in my case is to avoid holding all elements in memory, because the server would crash with an OutOfMemoryError then.

I’ve created an issue to promote this function for the standard library.
https://youtrack.jetbrains.com/issue/KT-41648

The code above works in my artificial test but not in practice. I get the following exception:

java.lang.IllegalStateException: stream has already been operated upon or closed

The problem is the hasNext check, since that already starts the iteration and then the foreach iterates a second time, but the concrete sequence returned from a JPA query can be iterated only once.

The improved version:

fun <T> Sequence<T>.chunked(predicate: (T, T) -> Boolean): Sequence<List<T>> {
    val underlyingSequence = this
    return sequence {
        val buffer = mutableListOf<T>()
        var last: T? = null
        for (current in underlyingSequence) {
            val shouldSplit = last?.let { predicate(it, current) } ?: false
            if (shouldSplit) {
                yield(buffer.toList())
                buffer.clear()
            }
            buffer.add(current)
            last = current
        }
        if (buffer.isNotEmpty()) {
            yield(buffer)
        }
    }
}

The last if suppresses yielding an empty list. If the underlying sequence was empty it should stay empty and chunked shouldn’t create an artificial and superfluous empty list.

1 Like

What concerns me in the proposed signature is that it covers only that particular use case when you can determine in the predicate whether or not to start a new chunk based on the current and previous elements.

I think other cases of custom splitting should be considered in order to decide on the signature. One example that comes to mind is when you want to split weighted items in buckets so that total weight of items in one bucket is not greater than the given value: Kotlin - How to split list of strings based on the total length of characters - Stack Overflow

2 Likes

@ilya.gorbunov Yes, my use case might be to specific for a general purpose function in the standard library. What do you think of a signature like the following?

fun <T> Sequence<T>.chunked(
    windowSize: Int, 
    chunkMaxSize: Int, 
    predicate: (List<T>) -> Boolean
): Sequence<List<T>> 

The predicate would receive a window that would have a maximum of elements given by windowSize, and the resulting chunks would have a maximum size of chunkMaxSize where -1 (or maybe null) would mean “unlimited”.

@ilya.gorbunov Reactor provides the method bufferUntilChanged for this use case. That would be even more restricted than my first version, but seems to be good enough for many use cases.

Reactor is, by the way, a great source of inspiration for methods that would be useful for Kotlin Flow.

Please file an issue here: Issues · Kotlin/kotlinx.coroutines · GitHub

I’ve actually thought of filing an issue for each operator of Flux, or at least for each operator name (to not become overwhelmed by all signature variants). What do you think?

Flow seems to be a great tool in principle. Remembering a benchmark it should be significantly faster than Reactor or RxJava, but it has very few operators at the moment.

See qwwdfsad’s comment:

Keep care of already filed issues.

My idea to file an issue for (more or less) every Flux operator is against the goals of the Flow developers. Flow should stay small and should only have very relevant operators.

Maybe it is better to create a separate operator lib, like the creator or RxJava did:

1 Like