我使用 RxJava 基本上收集单独发出的 Observable 列表,并将它们组合成 Observable 列表(本质上与 flatMap 相反)。这是我的代码:
// myEvent.findMemberships() returns an Observable<List<Membership>>
myEvent.findMemberships()
.flatMap(new Func1<List<Membership>, Observable<User>>() {
@Override
public Observable<User> call(List<Membership> memberships) {
List<User> users = new ArrayList<User>();
for (Membership membership : memberships) {
users.add(membership.getUser());
}
return Observable.from(users);
}
})
.toList()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<User>>() {
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) {
Timber.e(e, "Error when trying to get memberships");
}
@Override
public void onNext(List<User> users) {
Timber.d("%d users emitted", users.size());
}
})
我注意到我的 onNext 从未被调用。我似乎无法理解这一点。如果我删除 .toList 调用并基本上输出单个用户(如下所示),它通过发出每个项目来工作。
subscriptions //
.add(currentEvent.findMemberships()
.flatMap(new Func1<List<Membership>, Observable<User>>() {
@Override
public Observable<User> call(List<Membership> memberships) {
List<User> users = new ArrayList<User>();
for (Membership membership : memberships) {
users.add(membership.getUser());
}
return Observable.from(users);
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<User>() {
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) {
Timber.e(e, "Error when trying to get memberships");
}
@Override
public void onNext(User user) {
Timber.d("%d users emitted", user.getName());
}
}));
Q1。我对 .toList 的理解有误吗?
Q2。如何梳理一个单独发出的流Observable<Object>
s成单Observable<List<Object>>
?
** 编辑
@kjones 完全解决了这个问题。我的 findMemberships 调用没有调用 onComplete。我在下面添加了代码片段。我的实际用例因更多转换而变得更加复杂,这就是为什么我需要使用 .toList 调用。正如 @zsxwing 也正确指出的那样,对于这个用例,一个简单的 map 就足够了。
public Observable<List<Membership>> findMemberships() {
return Observable.create(new Observable.OnSubscribe<List<Membership>>() {
@Override
public void call(Subscriber<? super List<Membership>> subscriber) {
try {
// .....
List<Membership> memberships = queryMyDb();
subscriber.onNext(memberships);
// BELOW STATEMENT FIXES THE PROBLEM ><
// subscriber.onCompleted();
} catch (SQLException e) {
// ...
}
}
});
最佳答案
它看起来像是 myEvent.findMemberships()
返回的 Observable调用永远不会调用 onComplete。你能显示这段代码吗?
如果是这样,就可以解释您所看到的行为。 .toList()
在发出所有项目(由 onComplete 发出信号)之前不会发出列表。
你的第二个版本没有.toList()
, 将按如下方式进行:
.findMemberships()
emits a single List<Membership>
.flatMap()
transforms List<Membership> into a single List<User>
Observable.from(users) creates an observable that emits each user
.subscribe()
onNext() is called for each user
onCompleted() is never called.
您的原始版本:
.findMemberships()
emits a single List<Membership>
.flatMap()
transforms List<Membership> into a single List<User>
Observable.from(users) creates an observable that emits each user
.toList()
buffers each User waiting for onCompleted() to be called
onCompleted is never called because the .findMemberships Observable never completes
有几种解决方法:
1) 生成 findMemberShips()
Observable 调用 onComplete。如果 findMemberShips()
返回的 Observable,这可能是不可取的是一个 Rx 主题(PublishSubject、BehaviorSubject 等)
2) 使用Observable.just()
而不是 Observable.from()
.你已经有一个 List<User>
在 .flatMap() 中,只需返回它。使用 Observable.from(users)
创建一个发出每个用户的 Observable。 Observable.just(users)
会创建一个 Observable 发出一个 List<User>
.不需要 .toList()
.
3) 使用.map()
而不是 .flatMap()
.同样,不需要 .toList()
.由于每个 List<Membership>
变成了 List<User>
你只需要使用 .map()
.
myEvent
.findMemberships()
.map(new Func1<List<Membership>, List<User>>() {
@Override
public List<User> call(List<Membership> memberships) {
List<User> users = new ArrayList<User>();
for (Membership membership : memberships) {
users.add(membership.getUser());
}
return users;
}
})
关于java - 将 Observables 收集到列表似乎不会立即发出集合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26599891/