Kotlin flows and a pull-based approach

All of the examples I’ve seen for how to use Kotlin flows exhibit a push-based approach. By that I mean that flow are “collected”. This collection has a push-based behavior. Example:

flow.collect { value -> println(value) }

Here, the collect step will call the supplied lambda. In other words, it pushes data into the lambda function, which then has to process it somehow. The control flow is dictated by the collect function here.

I want to do the polar opposite. Something like this:

val value1 = flow.collectSingleValue()
processValue(value1)
val value2 = flow.collectSingleValue()
processValue(value2)

Here, I decide when a value is collected from the flow. I dictate the control flow. I pull a value from the flow.

This is useful for networking protocols. Example pseudocode:

while (true) {
    val headerLine = flow.readLine()
    if (headerLine.isEmpty())
        break
    parseHeader(headerLine)
}
val content = flow.read(contentSize)

Being able to partially collect data, to decide when it is collected, and to be able to break up the collection of the data into individual bits is essential here. I do not see how this could be done with a function like Flow.collect().

Can I do such a pull-based approach with flows? Or how else would you do a network IO sequence like the pseudo code above with flows? My understanding is that after collect() was run, the flow is done, it cannot be reused, it is terminated. So, if I can’t do partial collections, how else would I do it?

I still use Channel for that

I just realize my mistake. I was under the incorrect impression that once a flow is collected, the flow object is no longer usable, and another collection would be an error. But actually, it just restarts the flow.
So, I can do this:

class SimulatedIO {
    fun read(): String {
        Thread.sleep(1000L)
        globalCounter += 1
        return "readstr" + globalCounter
    }

    fun write(str: String) {
        println("write([$str])")
    }
}


fun readWithFlow(io: SimulatedIO): Flow<String> = flow {
    val str = io.read()
    emit(str)
}.flowOn(Dispatchers.IO)


fun main(vararg args: String) = runBlocking<Unit> {
    val io = SimulatedIO()

    launch {
        var str: String

        val f = readWithFlow(io)

        str = f.first()
        println(str)
        str = f.first()
        println(str)
    }
}

This simulates how to integrate some underlying blocking IO API with flow. (The SimulatedIO read call simulates blocking by putting the thread to sleep for a while.) I can call first() multiple times, and I get another string. This works, and I think it is okay to do it like this, right? There would be no meaningful performance overhead?

Also, while this allows me to read data whenever I want, writing is still an issue. What if for example I want to do this:

val header = incomingFlow.first() // collects a single string, behaving like a "readLine()" function
processHeader(header)
outgoingFlow.emit(responseHeader)

I have no idea how I would integrate outgoingFlow.

With channels however, it all becomes quite intuitive. I just pass a SendChannel and a ReceiveChannel to the code that parses the incoming network data and replies to it.

I was repeatedly told though that these days, in 99% of all cases, you want to use Flow instead of Channel? That’s why I’ve been trying to use Flow here.

Your fake IO only produces one item, so Flow and collect don’t really make sense here, a suspend method would fit better here.

If the real thing will send a data stream then collect will read everything unless you force it to end somehow. And then you’d need to create a new Flow based off the partially consumed IO from before. Sounds awkward.

Flow is optimal for manipulating streams of async data and collect works great most of the time, but if it doesn’t, use a Channel. You can go from one to the other as needed with methods like produceIn and consumeAsFlow.

1 Like

I see. So you would suggest an adapter class like this one:

class SimulatedCoroutineIO(private val io: SimulatedIO) {
    suspend fun read(): String {
        return withContext(Dispatchers.IO) {
            io.read() // blocks until there is something to read / received (in production code, there would also be some way to cancel IO)
        }
    }

    suspend fun write(str: String) {
        withContext(Dispatchers.IO) {
            io.write(str) // blocks until something can be written / sent (in production code, there would also be some way to cancel IO)
        }
    }
}

This also seems very intuitive and useful. I do wonder now what the advantage of channels over this would be if I don’t do any channel buffering, fan-in / fan-out, or communication across threads? If all I want to do is to do networking IO with an underlying blocking networking API, then is there still a reason to go with channels instead of the code above?

If your Channel or Flow is always one item, yes. Just use a suspend method.

If I’m dealing with a stream of data sent across my app, I go with Flow. I generally use Flow anytime the stream is crossing class boundaries.

If I want a “pending work” queue to communicate across coroutines within a class, I use a Channel .

I’ll convert between the two as needed/desired.

The actual communication would use packets as units of information, not strings. But the rest applies. I’d need a suspending readPacket() function, and the underlying platform specific IO code would do blocking reads to get a packet. So, while it is technically a stream (it is a streaming socket after all), the way I handle it is manually, on a per-packet basis. That is, multiple readPacket() etc. calls, just like in the example. So, I’ll stick with the adapter class approach. One nice aspect of this approach - if I see this correctly - is that if I turn it into an interface with just the two abstract suspend read/write functions, then I could also write a subclass that uses flows or channels if I need to, meaning that this is an even more generic interface.

As @fvasco correctly noted that is what Channel API is for. In particular you can take any Flow, convert to a channel with .produceIn(scope) extension and “pull” data from the channel by calling receive() on it as you need. Don’t forget to .cancel() the channel when you no longer need it, though.

1 Like