java - 将 Observables 收集到列表似乎不会立即发出集合

标签 java frp rx-java

我使用 RxJava 基本上收集单独发出的 Observable 列表,并将它们组合成 Observable 列表(本质上与 flatMap 相反)。这是我的代码:

        // myEvent.findMemberships() returns an Observable<List<Membership>>

        myEvent.findMemberships()
             .flatMap(new Func1<List<Membership>, Observable<User>>() {
               @Override
               public Observable<User> call(List<Membership> memberships) {
                 List<User> users = new ArrayList<User>();
                 for (Membership membership : memberships) {
                   users.add(membership.getUser());
                 }
                 return Observable.from(users);
               }
             })
             .toList()
             .subscribeOn(Schedulers.newThread())
             .observeOn(AndroidSchedulers.mainThread())
             .subscribe(new Observer<List<User>>() {
               @Override
               public void onCompleted() { }

               @Override
               public void onError(Throwable e) {
                 Timber.e(e, "Error when trying to get memberships");
               }

               @Override
               public void onNext(List<User> users) {
                 Timber.d("%d users emitted", users.size());
               }
             })

我注意到我的 onNext 从未被调用。我似乎无法理解这一点。如果我删除 .toList 调用并基本上输出单个用户(如下所示),它通过发出每个项目来工作。

subscriptions //
    .add(currentEvent.findMemberships()
             .flatMap(new Func1<List<Membership>, Observable<User>>() {
               @Override
               public Observable<User> call(List<Membership> memberships) {
                 List<User> users = new ArrayList<User>();
                 for (Membership membership : memberships) {
                   users.add(membership.getUser());
                 }
                 return Observable.from(users);
               }
             })
             .subscribeOn(Schedulers.newThread())
             .observeOn(AndroidSchedulers.mainThread())
             .subscribe(new Observer<User>() {
               @Override
               public void onCompleted() { }

               @Override
               public void onError(Throwable e) {
                 Timber.e(e, "Error when trying to get memberships");
               }

               @Override
               public void onNext(User user) {
                 Timber.d("%d users emitted", user.getName());
               }
             }));

Q1。我对 .toList 的理解有误吗?

Q2。如何梳理一个单独发出的流Observable<Object> s成单Observable<List<Object>>

** 编辑

@kjones 完全解决了这个问题。我的 findMemberships 调用没有调用 onComplete。我在下面添加了代码片段。我的实际用例因更多转换而变得更加复杂,这就是为什么我需要使用 .toList 调用。正如 @zsxwing 也正确指出的那样,对于这个用例,一个简单的 map 就足够了。

public Observable<List<Membership>> findMemberships() {
return Observable.create(new Observable.OnSubscribe<List<Membership>>() {
  @Override
  public void call(Subscriber<? super List<Membership>> subscriber) {
    try {
      // .....
      List<Membership> memberships = queryMyDb();

      subscriber.onNext(memberships);

      // BELOW STATEMENT FIXES THE PROBLEM ><
      // subscriber.onCompleted();

    } catch (SQLException e) {
      // ...
    }
  }
});

最佳答案

它看起来像是 myEvent.findMemberships() 返回的 Observable调用永远不会调用 onComplete。你能显示这段代码吗?

如果是这样,就可以解释您所看到的行为。 .toList()在发出所有项目(由 onComplete 发出信号)之前不会发出列表。

你的第二个版本没有.toList() , 将按如下方式进行:

.findMemberships()
    emits a single List<Membership>
.flatMap()
    transforms List<Membership> into a single List<User>
    Observable.from(users) creates an observable that emits each user
.subscribe()
    onNext() is called for each user
    onCompleted() is never called.

您的原始版本:

.findMemberships()
    emits a single List<Membership>
.flatMap()
    transforms List<Membership> into a single List<User>
    Observable.from(users) creates an observable that emits each user
.toList()
    buffers each User waiting for onCompleted() to be called
    onCompleted is never called because the .findMemberships Observable never completes

有几种解决方法:

1) 生成 findMemberShips() Observable 调用 onComplete。如果 findMemberShips() 返回的 Observable,这可能是不可取的是一个 Rx 主题(PublishSubject、BehaviorSubject 等)

2) 使用Observable.just()而不是 Observable.from() .你已经有一个 List<User>在 .flatMap() 中,只需返回它。使用 Observable.from(users)创建一个发出每个用户的 Observable。 Observable.just(users)会创建一个 Observable 发出一个 List<User> .不需要 .toList() .

3) 使用.map()而不是 .flatMap() .同样,不需要 .toList() .由于每个 List<Membership>变成了 List<User>你只需要使用 .map() .

myEvent
    .findMemberships()
    .map(new Func1<List<Membership>, List<User>>() {
        @Override
        public List<User> call(List<Membership> memberships) {
            List<User> users = new ArrayList<User>();
            for (Membership membership : memberships) {
                users.add(membership.getUser());
            }
            return users;
         }
    })

关于java - 将 Observables 收集到列表似乎不会立即发出集合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26599891/

相关文章:

java - 将 HashMap 写入 SQL 数据库时出错 - Android Studio

android - RxJava Android toMap 运算符停止执行

android - RxJava2 - 如何每次重试,除非它是 TimeoutException?

android - 如何使用 rxjava 为列表中的每个项目发出改造 api 请求?

android - 如何使用 Retrofit 2 和 RxJava 处理分页

java - 如何根据给定模式重新排列列表中的项目?

java - struts2 在操作类中获取值(value)

java - 由于找不到构造函数,从类对象动态实例化 AsyncTask 失败

javascript - 如果点击事件是在使用 Bacon.js 的拖动事件之后发生的,我该如何取消点击事件?

javascript - 利用 FRP 处理对象操作和存储