Function stops after .collect in callbackFlow

Hello. This loop repeats only once:

init {
        viewModelScope.launch(Dispatchers.IO) {
        for (i in 0..2) {
            fetch() // runs only once
            .collect {
                if (it.isNullOrEmpty()) {
                    //
                } else { proceed(it) }
            }

        }
    }
}

val firebaseDb: DatabaseReference = Firebase.database.reference

@ExperimentalCoroutinesApi
private fun fetch(): Flow<List<Data>?> = callbackFlow {
        val listener = object : ValueEventListener {
            override fun onDataChange(dataSnapshot: DataSnapshot) {
                val data = dataSnapshot.getValue<List<Data>>()
                offer(data)
            }

            override fun onCancelled(databaseError: DatabaseError) {
            }
        }
        val ref = firebaseDb.child("data")
        ref.addListenerForSingleValueEvent(listener)

        awaitClose{
            //remove listener here
            ref.removeEventListener(listener)
        }

}

But I can run fetch() multiple times with a recursion.
The same problem with
fun fetchFirebaseFlow(): StateFlow<List<Data>?>

Thanks to @nickallendev, the solution is to use close after offer .

Consider CompletableDeferred instead of callbackFlow with a single item callback.

Thanks, I was recommended to use suspendCancellableCoroutine. What’s the difference?

1 Like

Deferred is a high level abstraction, you can return it instead of a Flow.

Otherwise suspend fun fetch(): List<Data>? = suspendCancellableCoroutine { is a better option and I suggest you this way, but you cannot return a promise anymore, you have to return the result. Obviously you are free to create a Deferred only when you need it (async { fetch() }).

2 Likes

Thanks, I saw Deferred is used in one of the Retrofit samples but changed it to suspend function successfully later.