我是 rxJava 的新手,真的很困惑,我想先让我的应用程序离线,我决定使用 Realm 和 Retrofit,首先我想从 retrofit 获取数据,然后从我的远程获取数据webservice 然后,使用 realm 的 insertOrUpdate
将远程对象与本地对象合并。到目前为止,我能够继续这个过程,但是当我在 stetho 上查看我的网络请求时,这个方法已经完成了无限次请求。我哪里做错了?这是函数
public Observable<RealmResults<Event>> all() {
Realm realm = Realm.getDefaultInstance();
return realm.where(Event.class).findAllAsync()
.asObservable()
.filter(new Func1<RealmResults<Event>, Boolean>() {
@Override
public Boolean call(RealmResults<Event> events) {
return events.isLoaded();
}
})
.doOnNext(new Action1<RealmResults<Event>>() {
@Override
public void call(RealmResults<Event> events) {
service.getEvents()
.subscribeOn(Schedulers.io())
.subscribe(new Action1<List<Event>>() {
@Override
public void call(final List<Event> events) {
try(Realm realm = Realm.getDefaultInstance()) {
realm.executeTransaction(new Realm.Transaction() {
@Override
public void execute(Realm realm) {
realm.insertOrUpdate(events);
}
});
} // auto-close
}
});
}
});
}
这是我在 Activity 中使用的函数
private void getEvents() {
Log.i("EVENTSELECTION", "STARTING");
repository.all()
.subscribe(new Subscriber<List<Event>>() {
@Override
public void onCompleted() {
Log.i("EVENTSELECTION", "Task Completed");
swipeRefreshLayout.setRefreshing(false);
}
@Override
public void onError(Throwable e) {
Log.e("EVENTSELECTION", e.getMessage());
swipeRefreshLayout.setRefreshing(false);
e.printStackTrace();
}
@Override
public void onNext(List<Event> events) {
Log.i("EVENTSELECTION", String.valueOf(events.size()));
}
});
}
非常感谢。
最佳答案
Where did I go wrong?
让我们来看看:
1.
public Observable<RealmResults<Event>> all() {
Realm realm = Realm.getDefaultInstance();
这将打开一个永远不会关闭的 Realm 实例。所以你的Realm生命周期管理是错误的,引用documentation for best practices .
2.
return realm.where(Event.class).findAllAsync()
.asObservable() // <-- listens for changes in the Realm
// ...
.doOnNext(new Action1<RealmResults<Event>>() {
@Override
public void call(RealmResults<Event> events) {
service.getEvents() // <-- downloads data
.subscribeOn(Schedulers.io())
.subscribe(new Action1<List<Event>>() {
您基本上是说“如果 Realm 中的数据发生任何更改,则从服务下载数据并将其写入 Realm”
这将触发 RealmChangeListener,后者将触发下载等。
这是一个概念性错误,您正在使用 Realm notifications不正确。
RealmResults<T>
不仅仅是一个对象列表,它也是对更改的订阅。因此,您需要将其作为现场引用,并“保持订阅数据库中的变化”。
RealmResults<Sth> results;
RealmChangeListener<RealmResults<Sth>> changeListener = (element) -> {
if(element.isLoaded()) {
adapter.updateData(element);
}
};
void sth() {
results = realm.where(Sth.class).findAllSortedAsync("id");
results.addChangeListener(changeListener);
}
void unsth() {
if(results != null && results.isValid()) {
results.removeChangeListener(changeListener);
results = null;
}
}
在你的例子中,RealmResults<T>
表示订阅并提供对当前/新数据的访问被包装为 Observable<T>
您可以为其创建订阅者。
Observable<List<<Sth>> results;
Subscription subscription;
Action1<List<Sth>> changeListener = (element) -> {
if(element.isLoaded()) {
adapter.updateData(element);
}
};
void sth() {
results = realm.where(Sth.class).findAllSortedAsync("id").asObservable();
subscription = results.subscribe(changeListener);
}
void unsth() {
if(subscription != null && !subscription.isUnsubscribed()) {
subscription.unsubscribe();
subscription = null;
results = null;
}
}
如您所见,您在组件的开头有一个订阅,在组件的结尾有一个取消订阅。
调用 Observable.first()
是不正确的,这样做没有意义。如果您在任何教程中看到它(我以前见过它......),那么该教程是错误的。
关于java - RxJava + Retrofit + Realm 正在做无限制的获取请求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45136201/