Waiting for a value from another coroutine

My Android app has a suspend function createGroup which sends a message to a server, then receives a stream of messages back (these arrive in a Flow). The first function needs to return a ‘groupId’ value from one of these messages, but leave the Flow’s collect function running to handle further messages. At the moment I’ve done it using suspendCoroutine, as below. But is there a “better” way; something more idiomatic or simpler? Perhaps I should use another Flow to send the groupId from the subscriber coroutine to the caller even though it should only transfer one value? I’m also a bit puzzled that it wouldn’t let me call a suspend function (subscribeToGroupMessages) directly from the suspendCoroutine body, hence the extra launch. Why is that bit of code not suspendable?

    suspend fun createGroup(/* groupDetails */): String? {
        val groupId = suspendCoroutine<String?> { continuation ->
            val groupMessageFlow = grpcClient.createGroup(/*groupDetails*/)
            if (groupMessageFlow != null) {
                viewModelScope.launch {
                    subscribeToGroupMessages(groupMessageFlow, continuation)
                }
            }
        }
        return groupId
    }

    private suspend fun subscribeToGroupMessages(
        groupMessageFlow: Flow<GroupMessage>,
        continuation: Continuation<String?>? = null
    ) {
        var cont = continuation
        groupMessageFlow.collect { message ->
            if (cont != null /* && message is the one containing groupId */) {
                cont?.resume(/* groupId extracted from message */)
                cont = null
            }
            /* Handle other types of message */
        }
    }

If I can ask, what does this function do with further items? I ask because it seems quite confusing to me in general. We have a createGroup() function which returns ID of created group, it is suspending, so it waits for the operation to finish… or maybe not. As a matter of fact, it quietly launches a background processing of some sort. As a user of this function, I would never expect it to do so.

Depending on why this function starts background processing and what does it do, I think we can solve this problem differently.

1 Like

The app communicates with the server by gRPC. When a user creates or joins a group it subscribes to a stream of messages from the server which inform when users join or leave or their status changes. The first of these messages contains information about the group. createGroup is called by a Jetpack Compose Button’s onClick function. That part of the UI needs to be updated once the group has been created on the server, and the groupId is used to compose the new UI. The groupId is stored in a MutableState in a ViewModel, so I suppose I could just let that trigger the UI update, but it can also get updated by a joinGroup function, and in that case the process of rebuilding the UI is slightly different; after createGroup, a specific flag has to be cleared. I thought waiting for createGroup would be a good way to handle all this.

EDIT: After describing it above, I thought at first that simply responding to that groupId’s MutableState update was the “better” way I was looking for, but now I don’t think it’s that simple after all. The groupId is part of the user’s general state, but the screen with the “Create Group” button has its own state to determine which controls it should show. I don’t want these states to be combined, but using a change in groupId to trigger a change in the screen state is also problematical. Compose isn’t designed to work that way. I don’t think a @Composable function is even allowed to change its own (or any other?) state during composition; it can only do so in callbacks such as onClick.

So you want something that returns the group ID from the first item from the server and a flow containing the rest of the items from the server?

To answer your subquestion: suspendCoroutine is used to convert an non-coroutine callback to suspend, hence the inner block which suspendCoroutine takes as argument is not marked ‘suspend’. You can see that in the source-code in IntelliJ checking its Declaration (probably Ctrl-click it). See also kotlin - What is suspendCoroutine? - Stack Overflow

Your ‘subscribeToGroupMessages’ function has no need of suspend, as it launches a coroutine that collects, so you can just remove ‘suspend’.

It seems like a weird flow, but I don’t know your context, and don’t really want to dig that deep (so don’t bother explaining it). I just want to add that it might also make sense to use a publisher to send an event with the group-id. If it makes sense that your ‘createGroup’ does not return the value, but instead something else listens for it then just publish the value from the flow. For instance you might have a reference to a GroupId-Manager that receives the groupId, and can publish to others who cares. This makes it more event driven and the code snippet you pasted would become much simpler to read, but it might not fit the need of that code which uses createGroup.

For instance a caching flow if that fits your need:

val groupIdUpdates = MutableSharedFlow(1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
...
groupIdUpdates.emit(value) // publish
...
groupIdUpdates.collect(collector) // collect updates