Emit Flow data from an event listener function

Hello. I’m not sure if I can pass my data from an overriden function and can use Flow to pass the data from Repository to ViewModel. Here in example with Firebase I have a Suspension functions can be called only within coroutine body error. Maybe I could rewrite it somehow?

Repository

    fun fetchFirebaseFlow(): Flow<List<MyData>?> = flow {
        var ret: List<MyData>? = null
        firebaseDb.child("data").addListenerForSingleValueEvent(
            object : ValueEventListener {
                override fun onDataChange(dataSnapshot: DataSnapshot) {
                    val data = dataSnapshot.getValue<List<MyData>>()
                    emit(data) // Error. How to return the data here?
                }

                override fun onCancelled(databaseError: DatabaseError) {
                    emit(databaseError) // Error. How to return the data?
                }
            })
//        emit(ret) // Useless here
    }

ViewModel

    private suspend fun fetchFirebase() {
        repo.fetchFirebaseFlow().collect { data ->
            if (!data.isNullOrEmpty()) {
                // Add data to something
            } else {
                // Something else
            }
    }

Seems like you are looking for callbackFlow.

2 Likes

Yes, I tried it as it was recommended but my fetchFirebaseFlow became suspended and proceed only once in a loop. Also I tried MutableStateFlow but it returns data twice (first one with initialization) and also my function became suspended.

1 Like

It’s not really clear what you’re issue is. Coroutines suspend, that’s kind of the point.

Is the issue that you have code after fetchFirebase that is not running? If so then you can launch a coroutine and call fetchFirebase inside. The new coroutine will suspend but not the code that launched it.

Sure ! callBackFlow can solve this case

1 Like

This loop repeats only once:

init {
        viewModelScope.launch(Dispatchers.IO) {
        for (i in 0..2) {
            fetch().collect {
                if (it.isNullOrEmpty()) {
                    //
                } else { }
            }

        }
    }
}

val firebaseDb: DatabaseReference = Firebase.database.reference

@ExperimentalCoroutinesApi
private fun fetch(): Flow<List<Data>?> = callbackFlow {
        val listener = object : ValueEventListener {
            override fun onDataChange(dataSnapshot: DataSnapshot) {
                val data = dataSnapshot.getValue<List<Data>>()
                offer(data)
            }

            override fun onCancelled(databaseError: DatabaseError) {
            }
        }
        val ref = firebaseDb.child("data")
        ref.addListenerForSingleValueEvent(listener)

        awaitClose{
            //remove listener here
            ref.removeEventListener(listener)
        }

}

I asked this question in another topic because it’s not only about Android.

Flow.collect is like Sequence.forEach, it starts the “lazy” collection and doesn’t stop until the collection says it is done.

So a Flow that never finishes like the one you’ve implemented, will cause collect to never return.

awaitClose doesn’t return until the Flow is canceled by the collector or closed by the Flow implementation (otherwise it would finish immediately without any data). Neither happens in your code above.

You could fix your code by calling close after offer.

Honestly, Flow is not the best tool for returning a single async value. That’s like passing around a List<Int> when all you have is an Int. Plain old suspend methods are best for single results.
callbackFlow is perfect for wrapping addValueEventListener, but for wrapping addListenerForSingleValueEvent, I’d suggest using suspendCancellableCoroutine. Then fetch would become just a suspend method.

2 Likes

Thanks.

P.S. actually, I use the loop to fetch different data from Firebase, so, could be it a good idea to use flow in this case if I move the loop within fetch()?

You are probably best served by writing a general helper (something like suspend FirebaseReference.fetch(): DataSnapshot) and the using that however you need.

Flow works best if you are getting data over time and acting on that data as it comes in (as opposed to accumulating it into a data class or collection before processing) . I can’t really tell if that’s your scenario or not.

2 Likes