Can Flow be used for an uncertain producer?

I’ve a use case where a producer emits elements at random intervals. I want the consumer to wait for a certain time, then give up (timeout) and close the channel. Currently, I’m doing this using a buffered channel, where the consumer loops over the channel within a withTimeout block.

Having heard that flows are rad, I’m wondering if this is possible using a flow. The problem is, once the consumer gets hold of the flow reference (immediately), there’s no way for the producer to emit more values into it (like channel.send), so the consumer always times out receiving nothing. It’s almost as if I need a deferred emit that can keep appending to the flow (like a queue).

Flows and channels can be converted back and forth. Sounds like you may be looking for ReceiveChannel.consumeAsFlow. There’s also BroadcastChannel.asFlow().

If you are interfacing with some sort of callback/listener API (you call channel.send from some listener/callback implementation), then I recommend callbackFlow.

@nickallendev callbackFlow looked promising since I’m working with a gRPC client callback, but I need to output to two different channels, which doesn’t seem to be possible using the callbackFlow.

My use case is gRPC bidirectional streaming. The client receives server response, and may generate more messages for the server (request channel). It also processes the response it received, and puts it on another channel (response channel).

I’m open to flow options, but it seems like sticking with the channels is the simplest option for now.

There is an example of doing this with Flow right here: https://kotlinlang.org/docs/reference/coroutines/flow.html#flow-cancellation-basics

It isn’t that you cancel the flow you are cancelling the coroutine where the flow collection is happening.

I’m not sure why you mentioned cancellation. I think what I need is shared flow, which seems to be a WIP. There’s also an issue with the server never actually calling onComplete or onError, only onNext. Based on a custom value received in onNext, I can call onError myself, but there’s no way for me know when the flow is complete.

You were talking about timeout and that was an example of a timeout.

A naive implementation is not exactly difficult. This is not tested, just typed out in the forum, but basic concept should be clear.

fun <T> Flow<T>.mySharedFlowIn(scope: CoroutineScope) {
    val channel = BroadcastChannel(Channel.BUFFERED)
    scope.launch {
        try {
            collect { channel.send(it) }
            channel.close()
        } catch (ex: Exception) {
            channel.close(ex)
        }
    }
    return channel.asFlow()
}

You know a flow is complete when collect finishes. Flow<T>.onComplete is also an option.