Use FireStore and Firebase Realtime Database with Kotlin Flow

How to use the Flow API in Kotlin with Firebase Realtime Database and Firestore to fetch data in a reactive streaming manner.
Dec 29 2022 · 5 min read

Background

By using the Flow API with Firebase Realtime Database and Firestore, you can easily fetch data and listen for updates in your Android app, without the need for complex callbacks or asynchronous code.

In this article, we will look at how to use the Flow API in Kotlin with Firebase Realtime Database and Firestore to fetch data in a reactive streaming manner.

Initially, we will understand an example but later we will create an extension that can convert any database reference to a Flow.

Let’s get started!

We are what we repeatedly do. Excellence, then, is not an act, but a habit. Try out Justly and start building your habits today!

FirebaseDatabase — Read data with persistent listeners

Let’s assume we want to get a user from the path users/{id}

Let’s first define the repository that wraps database interaction and hide details from ViewModel.

class UserRepository {
  private val db: FirebaseDatabase = FirebaseDatabase.getInstance()

      fun getUser(userId: String): Flow<User?> {
        return callbackFlow {
            // Reference the user node in the database
            val userRef = db.getReference("users").child(userId)

            // Add a listener to the user node
            val listener = userRef.addValueEventListener(object : ValueEventListener {
                override fun onDataChange(dataSnapshot: DataSnapshot) {
                    // Emit the user data to the flow
                    trySend(dataSnapshot.getValue(User::class.java))
                }

                override fun onCancelled(error: DatabaseError) {
                    // Cancel the flow on error
                    cancel()
                }
            })
            // Return the listener to be used to cancel the flow
            awaitClose { userRef.removeEventListener(listener) }
        }
    }
}

In this example, the FirebaseDatabase instance is defined in the init block. The getUser function uses this instance to retrieve the user data from the database. It returns a Flow of User objects.

And then sets up a ValueEventListener to listen for updates to the specific user node in the database. The trySend function is used to emit the updated user data whenever there is a change in the database.

In last, awaitClose function is used to clean up the listener when the Flow is canceled.

Now, Let’s use this in a ViewModel.

class UserViewModel(private val repository: UserRepository) : ViewModel() {
  private val _user = MutableLiveData<User>()
  val user: LiveData<User>
    get() = _user

  fun observeUser(userId: String) {
    viewModelScope.launch {
      withContext(Dispatchers.IO) {
        repository.getUser(userId).collect {
          _user.postValue(it)
        }
      }
    }
  }
}

This is very straightforward, we start observing the user using collect on a Flow, and post the value to liveData.

That’s it if you want to write a one-time wrapper around Firebase Realtime Database.

However, most of the time you will need to do this multiple times in a project. Let’s see how we can write a simple extension to do this.

Flow Extension for Firebase Realtime Database

Basically, we will need to move our code of DatabaseRepository in an extension function of DatabaseReference . Let’s see how we can do that.

fun <T> DatabaseReference.addValueEventListenerFlow(dataType: Class<T>): Flow<T?> = callbackFlow {
    val listener = object : ValueEventListener {
        override fun onDataChange(dataSnapshot: DataSnapshot) {
            val value = dataSnapshot.getValue(dataType)
            trySend(value)
        }

        override fun onCancelled(error: DatabaseError) {
            cancel()
        }
    }
    addValueEventListener(listener)
    awaitClose { removeEventListener(listener) }
}

This extension function takes dataType as an argument and sets up ValueEventListener to listen for updates to the node in the database.

To use this extension function, you can simply call it on the userRef object. For example:

fun getUser(userId: String): Flow<User?> {
        val userRef = FirebaseDatabase.getInstance().getReference("users").child(userId)
        return userRef.addValueEventListenerFlow(User::class.java)
    }

This extension function can be used to quickly convert ValueEventListener into a Flow, allowing you to use the standard Flow operators to transform and consume the data in a reactive way.

FireStore — Read updates with snapshot listener

Now let’s see how we can listen to real-time updates from FireStore.

private val firestore = FirebaseFirestore.getInstance()

fun getUser(userId: String): Flow<User?> = callbackFlow {
    val listener = object : EventListener<DocumentSnapshot> {
        override fun onEvent(snapshot: DocumentSnapshot?, exception: FirebaseFirestoreException?) {
            if (exception != null) {
                // An error occurred
                cancel()
                return
            }

            if (snapshot != null && snapshot.exists()) {
                // The user document has data
                val user = snapshot.toObject(User::class.java)
                trySend(user)
            } else {
                // The user document does not exist or has no data
            }
        }
    }

    val registration = firestore.collection("users").document(userId).addSnapshotListener(listener)
    awaitClose { registration.remove() }
}

The getUser method uses the addSnapshotListener method to listen for updates to the user document and emit the updated data to the flow.

Let’s use it in ViewModel.

class UserViewModel(private val repository: UserRepository) : ViewModel() {
    private val _user = MutableStateFlow<User?>(null)
    val user: StateFlow<User?>
        get() = _user

    fun getUser(userId: String) {
        viewModelScope.launch {
            withContext(Dispatchers.IO) {
                repository.getUser(userId).collect { user ->
                    _user.value = user
                }
            }
        }
    }

Here we have used StateFlow to listen to the updates from Repository.

Now let’s create an extension function for the EventListener interface in Firestore, to avoid boilerplate code.

Flow Extension for Firestore

The extension functions of both the Firebase real-time database DatabaseReference and the Firestore DocumentReference are kind of similar.

Let’s see how we can write an extension.

fun <T> DocumentReference.addSnapshotListenerFlow(dataType: Class<T>): Flow<T?> = callbackFlow {
    val listener = object : EventListener<DocumentSnapshot> {
        override fun onEvent(snapshot: DocumentSnapshot?, exception: FirebaseFirestoreException?) {
            if (exception != null) {
                // An error occurred
                cancel()
                return
            }

            if (snapshot != null && snapshot.exists()) {
                // The document has data
                val data = snapshot.toObject(dataType)
                trySend(data)
            } else {
                // The document does not exist or has no data
            }
        }
    }

    val registration = addSnapshotListener(listener)
    awaitClose { registration.remove() }
}

The above extension function defines an EventListener object that listens for updates to the document and emits the updated data to the flow same as the Firebase database extension function.

To use the extension function, you can simply call it on a DocumentReference object and collect the emitted items from the flow. For example:

fun getUser(userId: String): Flow<User?> {
    val userRef = firestore.collection("users").document(userId)
    return userRef.addSnapshotListenerFlow(User::class.java)
}

I hope the above code examples are helpful to understand how to use the callbackFlow function and the EventListener interface in Firestore to fetch data and listen for updates in an Android app.

Remember to handle errors and cancel the flow when necessary, and make sure to remove the listener when it is no longer needed to avoid leaking resources.

Conclusion

That’s it for today, I hope you learned something!

This is just the basics of the Firebase database but is very useful as mainly we will use this for real-time changes in our database. Add/Update/Delete calls are mostly one-time calls, so there is no need to use Flow for those operations.

Similar articles


radhika-s image
Radhika saliya
Mobile App Developer | Sharing knowledge of Jetpack Compose & android development


radhika-s image
Radhika saliya
Mobile App Developer | Sharing knowledge of Jetpack Compose & android development

Let's Work Together

Not sure where to start? We also offer code and architecture reviews, strategic planning, and more.

cta-image
Get Free Consultation
footer
Subscribe Here!
Follow us on
2025 Canopas Software LLP. All rights reserved.