Function stops after .collect in callbackFlow

Hello. This loop repeats only once:

init {
        viewModelScope.launch(Dispatchers.IO) {
        for (i in 0..2) {
            fetch() // runs only once
            .collect {
                if (it.isNullOrEmpty()) {
                    //
                } else { proceed(it) }
            }

        }
    }
}

val firebaseDb: DatabaseReference = Firebase.database.reference

@ExperimentalCoroutinesApi
private fun fetch(): Flow<List<Data>?> = callbackFlow {
        val listener = object : ValueEventListener {
            override fun onDataChange(dataSnapshot: DataSnapshot) {
                val data = dataSnapshot.getValue<List<Data>>()
                offer(data)
            }

            override fun onCancelled(databaseError: DatabaseError) {
            }
        }
        val ref = firebaseDb.child("data")
        ref.addListenerForSingleValueEvent(listener)

        awaitClose{
            //remove listener here
            ref.removeEventListener(listener)
        }

}

But I can run fetch() multiple times with a recursion.
The same problem with
fun fetchFirebaseFlow(): StateFlow<List<Data>?>

Thanks to @nickallendev, the solution is to use close after offer .

Consider CompletableDeferred instead of callbackFlow with a single item callback.