Kotlin flattenMapMerge and buffer

Hello!

Could somebody explain me how buffer applied after flattenMapMerge is fused and affects any internal buffers? If possible, with examples.

Operator fusion

Applications of flowOn, buffer, and produceIn after this operator are fused with its concurrent merging so that only one properly configured channel is used for execution of merging logic.

I saw this explanation, but didn’t get it.

Thank you!

Could anybody help me with it please?

I tried this code:

import kotlinx.coroutines.*
import kotlin.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.channels.*

fun flowFrom(elem: String): Flow<String> {
    println("Called flowFrom")
    return flowOf(1, 2, 3).map { "${it}_${elem} " }.onEach { println("Numbers ${Thread.currentThread()} $it") }.onEach { delay(50) }
}

fun main() = runBlocking<Unit> {
    println("Main ${Thread.currentThread()}")
    flowOf("A", "B", "C").onEach { println("Strings ${Thread.currentThread()} $it") }.onEach { delay(50) }
        .flatMapMerge() { flowFrom(it) }.buffer(1)
        .collect { delay(500); println("Result ${Thread.currentThread()} $it") }    
}

But buffer(1) have no affect on result.

The flatMapMerge() has a default buffer size (not sure what it is, but it’s bigger than 1). The buffer() changes the size of this buffer.

What effect were you expecting from changing the buffer size?

I got it, that it should somehow affect thee buffer inside, but I still see that items are processed concurrently and buffer didn’t become full, not suspend producer etc.

So the result is:

Main Thread[main @coroutine#1,5,main]
Strings Thread[main @coroutine#2,5,main] A
Called flowFrom
Strings Thread[main @coroutine#2,5,main] B
Numbers Thread[main @coroutine#3,5,main] 1_A 
Called flowFrom
Strings Thread[main @coroutine#2,5,main] C
Numbers Thread[main @coroutine#4,5,main] 1_B 
Numbers Thread[main @coroutine#3,5,main] 2_A 
Called flowFrom
Numbers Thread[main @coroutine#5,5,main] 1_C 
Numbers Thread[main @coroutine#4,5,main] 2_B 
Result Thread[main @coroutine#1,5,main] 1_A 
Numbers Thread[main @coroutine#3,5,main] 3_A 
Result Thread[main @coroutine#1,5,main] 1_B 
Numbers Thread[main @coroutine#5,5,main] 2_C 
Result Thread[main @coroutine#1,5,main] 2_A 
Numbers Thread[main @coroutine#4,5,main] 3_B 
Result Thread[main @coroutine#1,5,main] 1_C 
Result Thread[main @coroutine#1,5,main] 2_B 
Numbers Thread[main @coroutine#5,5,main] 3_C 
Result Thread[main @coroutine#1,5,main] 3_A 
Result Thread[main @coroutine#1,5,main] 2_C 
Result Thread[main @coroutine#1,5,main] 3_B 
Result Thread[main @coroutine#1,5,main] 3_C

But I thought it would stop upstream emission or anything else with emitted values.

No, it’s behaving correctly.

Imagine the flatMapMerge() as 3 hoppers. There’s a guy who pulls an item (pretty much at random) from one of the hoppers and puts it in a bucket (the buffer). When an item is pulled out of a hopper, the hopper starts producing the next item. Then there’s another guy who takes items out of the bucket and processes them.

So the first hopper produces 1_A, which the first guy takes and puts in the bucket, and the second guy pulls it out of the bucket and starts processing it. There’s room in hopper A for another item, so it starts producing 2_A. Hopper B starts up at the same time and produces 1_B. 2_A and 1_B arrive in their hoppers at about the same time, and the guy picks 1_B and puts it in the bucket, allowing hopper B to start producing 2_B. Hopper C also starts up and produces 1_C. Now all of the hoppers are full, and so is the bucket, so processing suspends.

The second guy finishes processing 1_A and takes 1_B out of the bucket, so the first guy picks 2_A out and puts it in the bucket, allowing hopper A to start producing 3_A. When the second guy finishes with 1_B, he takes 2_A out of the bucket and the first guy puts 1_C in the bucket, allowing 2_C to start. Etc., etc.

Try setting the buffer size to 0, that might be closer to what you want.

thank you very much for the explanation