Variable length Sequence.chunked

I like sequenceX.chunked(n) { ... } a lot - it lets me take an unbounded sequence of large objects (video frames), bucket them up into reasonable chunks (eg 11 frame groups), median-pixel mush them together into a combined frame, then pass that frame to the next step. And best of all it does it “memory smart” - 11 frames get read in, then it mushes them together, outputs a single combined frame down the line, and and the sequence continues. It is easy to imagine a factory where 11 small parts get built up into bigger parts, and the conveyor belts don’t run too fast or get behind.

I’d like to do the same with a dynamic bucket size. Is there something that works like chunkOnceThresholdReached? I don’t think “aggregate” works, because it has to go through all frames to make sure the 10,000th frame doesn’t go into group 1, which breaks the unbounded sequence.

mySequenceOfCameraFrames
    .chunkOnceThresholdReached { rawFrame -> isThereEnoughMotion(rawFrame) }
    .map { listOfFrames -> smushTogetherFrames(listOfFrames) }
    .forEach { combinedFrame -> showFrame(combinedFrame) }

It’s not hard to write. For example:

fun <T> Sequence<T>.chunkedOnThreshold(condition: (T) -> Boolean): Sequence<List<T>> = buildSequence {
    val buffer = mutableListOf<T>()
    for (element in this@chunkedOnThreshold) {
        buffer += element
        if (condition(element)) {
            yield(buffer)
            buffer.clear()
        }
    }
    if (buffer.isNotEmpty()) yield(buffer)
}

Test

fun main(args: Array<String>) {
    // (1..60).asSequence().chunkedOnThreshold { it % 7 == 0 }.forEach { println(it) }
    val random = Random()
    generateSequence { random.nextInt(10) }
        .take(100)
        .chunkedOnThreshold { it % 7 == 0 }
        .forEach(::println)
}
1 Like

That is EXACTLY what I was unclear on, and a perfect example. Thank you!

The toughest part is how to design this function so that it is usable not only in this particular case, but in the other ones too.

For example, should it close the current chunk before the element that matches predicate, or right after it?
Is it enough to pass just an element to decide that the threshold has been reached?

We considered these overloads when we were designing windowed/chunked functions (see Sliding window functions · Issue #11 · Kotlin/KEEP · GitHub), but the prototypes turned out to be very convoluted.

I needed that type of functionality in my case to do a flow layout laying out as many items per line as can fit the width and then as many lines as will fit the height of the page.

Here is the function I came up with to do it with fold (I have overloads for float and double as well):

inline fun <T> Iterable<T>.bucketize(bucketSize: Int, selector: T.() -> Number) : Iterable<Iterable<T>>
    = fold(Int.MIN_VALUE to mutableListOf<MutableList<T>>())
    {
        (left, collection), item ->
        val amount = item.selector().toInt()
        if(left >= amount)
        {
            left - amount to collection.apply { last().add(item) }
        }
        else
        {
            bucketSize - amount to collection.apply { add(mutableListOf(item)) }
        }
    }.let { (_, collection) -> collection }

Not sure it is the best way to do it but it is the best I came up with.