Data Observers & Rx
How to observe box and query changes using ObjectBox for Java, how to integrate with RxJava.
On this page:
ObjectBox for Java - Data Observers and Reactive Extensions
ObjectBox for Java makes it easy for your app to react to data changes by providing:
data observers,
reactive extensions,
and an optional library to work with RxJava.
This makes setting up data flows easy while taking care of threading details.
Reactive Observers: A First Example
Let’s start with an example to demonstrate what you can do with reactive data observers:
The first line creates a regular query to get Task
objects where task.complete == false
. The second line connects an observer to the query. This is what happens:
the query is executed in the background
once the query finishes the observer gets the result data
whenever changes are made to
Task
objects in the future, the query will be executed againonce updated query results are in, they are propagated to the observer
the observer is called on Android’s main thread
Now, let’s dive into the details.
Data Observers Basics
When objects change, ObjectBox notifies subscribed data observers. They can either subscribe to changes of certain object types (via BoxStore) or to query results. To create a data observer you need to implement the generic io.objectbox.reactive.DataObserver
interface:
This observer will be called by ObjectBox when necessary: typically shortly after subscribing and when data changes.
Note: onData()
is called asynchronously and decoupled from the thread causing the data change (like the thread that committed a transaction).
Observing General Changes
BoxStore
allows a DataObserver
to subscribe to object types. Let’s say we have a to-do list app where Task
objects get added. To get notified when Task
objects are added in another place in our app we can do the following:
Here onData()
is not called with anything useful as data. If you need more than being notified, like to get a list of Task
objects following the above example, read on to learn how to observe queries.
Note: there is also subscribe()
which takes no arguments. It subscribes the observer to receive changes for all available object classes.
Observing Queries
ObjectBox let’s you build queries to find the objects matching certain criteria. Queries are an essential part of ObjectBox: whenever you need a specific set of data, you will probably use a query.
Combining queries and observers results in a convenient and powerful tool: query observers will automatically deliver fresh results whenever changes are made to entities in a box. Let’s say you display a list of to-do tasks in your app. You can use a DataObserver to get all tasks that are not yet completed and pass them to a method updateUi() (note that we are using lambda syntax here):
So when is our observer lambda called? Immediately when an observer is subscribed, the query will be run in a separate thread. Once the query result is available, it will be passed to the observer. This is the first call to the observer.
Now let’s say a task gets changed and stored in ObjectBox. It doesn't matter where and how; it might be the user who marked a task as completed, or some backend thread putting additional tasks during synchronization with a server. In any case, the query will notify all observers with (potentially) updated query results.
Note that this pattern can greatly simplify your code: there is a single place where your data comes in to update your user interface. There is no separate initialization code, no wiring of events, no re-running queries, etc.
See the subscribe()-method documentation for more details.
Canceling Subscriptions
When you call observer()
, it returns a subscription object implementing the io.objectbox.reactive.DataSubscription
interface:
Keep a reference to the DataSubscription
for as long as results should be received, otherwise it can be garbage collected at any point. Also call cancel()
on it once the observer should not be notified anymore, e.g. when leaving the current screen:
If you have more than one query subscription, you might find it useful to create a DataSubscriptionList
instance instead to keep track of multiple DataSubscription
objects. Pass the list to the query.subscribe(subList)
overload. A basic example goes like this:
Note: On Android, you would typically create the subscription in one of the onCreate()/onStart()/onResume()
lifecycle methods and cancel it in its counterpart onDestroy()/onStop()/onPause()
.
Observers and Transactions
Observer notifications occur after a transaction is committed. For some scenarios it is especially important to know transaction bounds. If you call box.put()
or remove()
individually, an implicit transaction is started and committed. For example, this code fragment would trigger data observers on User.class
twice:
There are several ways to combine several operations into one transaction, for example using one of the runInTx()
or callInTx()
methods in the BoxStore class. For our simple example, we can simply use an overload of put()
accepting multiple objects:
This results in a single transaction and thus in a single DataObserver
notification.
Reactive Extensions
In the first part you saw how data observers can help you keep your app state up to date. But there is more: ObjectBox comes with simple and convenient reactive extensions for typical tasks. While most of these are inspired by RxJava, they are not actually based on RxJava. ObjectBox brings its own features because not all developers are familiar with RxJava (for the RxJava ObjectBox library see below). We do not want to impose the complexity (Rx is almost like a new language to learn) and size of RxJava (~10k methods) on everyone. So, let’s keep it simple and neat for now.
Thread Scheduling
On Android, UI updates must occur on the main thread only. Luckily, ObjectBox allows to switch the observer from a background thread over to the main thread. Let’s take a look on a revised version of the to-do task example from above:
Where is the difference? The additional on()
call is all that is needed to tell where we want our observer to be called. AndroidScheduler.mainThread()
is a built-in scheduler implementation. Alternatively, you can create an AndroidScheduler
using a custom Looper
, or build a fully custom scheduler by implementing the io.objectbox.reactive.Scheduler
interface.
Transforming Data
Maybe you want to transform the data before you hand it over to an observer. Let’s say, you want to keep track of the count of all stored objects for each type. The BoxStore subscription gives you the classes of the objects, and this example shows you how to transform them into actual object counts:
Note that the transform operation takes a Class
object and returns a Long
number. Thus the DataObserver
receives the object count as a Long
parameter in onData()
.
While the lambda syntax is nice and brief, let’s look at the io.objectbox.reactive.Transformer
interface for clarification of what the transform()
method expects as a parameter:
Some additional notes on transformers:
Transforms are not required to actually “transform” any data. Technically, it is fine to return the same data that is received and just do some processing with (or without) it.
Transformers are always executed asynchronously. It is fine to perform long lasting operations.
ErrorObserver
Maybe you noticed that a transformer may throw any type of exception. Also, a DataObserver
might throw a RuntimeException
. In both cases, you can provide an ErrorObserver
to be notified about an exception that occurred. The io.objectbox.reactive.ErrorObserver
is straight-forward:
To specify your ErrorObserver
, simply call the onError()
method after subscribe()
.
Single Notifications vs. Only-Changes
When you subscribe to a query, the DataObserver
gets both of the following by default:
Initial query results (right after subscribing)
Updated query results (underlying data was changed)
Sometimes you may by interested in only one of those. This is what the methods single()
and onlyChanges()
are for (call them after subscribe()
). Single subscriptions are special in the way that they are cancelled automatically once the observer is notified. You can still cancel them manually to ensure no call to the observer is made at a certain point.
Weak References
Sometimes it may be nice to have a weak reference to a data observer. Note that for the sake of a deterministic flow, it is advisable to cancel subscriptions explicitly whenever possible. If that does not scare you off, use weak()
after subscribe()
.
Threading overview
To summarize threading as discussed earlier:
Query execution runs on a background thread (exclusive for this task)
DataTransformer
runs on a background thread (exclusive for this task)DataObserver
andErrorObserver
run on a background thread unless a scheduler is specified via theon()
method.
ObjectBox RxJava Extension Library
By design, there are zero dependencies on any Rx libraries in the core of ObjectBox. As you saw before ObjectBox gives you simple means to transform data, asynchronous processing, thread scheduling, and one time (single) notifications. However, you still might want to integrate with the mighty RxJava 2 (we have no plans to support RxJava 1). For this purpose we created the ObjectBox RxJava extension library:
It provides the classes RxQuery
and RxBoxStore
. Both offer static methods to subscribe using RxJava means.
For general object changes, you can use RxBoxStore
to create an Observable
. RxQuery
allows to subscribe to query objects using:
Flowable
Observable
Single
Example usage:
The extension library is open-source and available GitHub.
ObjectBox for Flutter/Dart - Reactive Queries
You can build a Stream from a query to get notified any time there is a change to the box of any of the queried entities:
For a Flutter app you typically want to get the latest results immediately when listening to the stream, and also get a list of objects instead of a query instance:
Last updated