java - 将 Single 应用于 ObservableSource 并且不要过度读取

标签 java reactive-programming rx-java2

总的来说,我对 RX 很陌生,特别是 rxjava,请原谅我的错误。

此操作依赖于两个异步操作。

第一个使用过滤器函数尝试从异步 Observable 返回的列表中获取单个实体。

第二个是与设备通信并生成状态更新的 Observable 的异步操作。

我想获取从过滤器函数创建的 Single,将其应用于 pairReader(...),并订阅其 Observable 进行更新。我可以让它按所示方式工作,但前提是我包含注释的 take(1) ,否则我会收到异常,因为链尝试从 Single 中提取另一个值。

  Observable<DeviceCredential> getCredentials() {
    return deviceCredentialService()
            .getCredentials()
            .flatMapIterable(event -> event.getData());
  }

  Single<Organization> getOrgFromCreds(String orgid) {
    return getCredentials()
      // A device is logically constrained to only have a single cred per org
      .map(DeviceCredential::getOrganization)
      .filter(org -> org.getId().equals(orgid))
      .take(1)  // Without this I get an exception
      .singleOrError();
  }

  Function<Organization, Observable<Reader.EnrollmentState>> pairReader(String name) {
    return org -> readerService().pair(name, org);
  }

getOrgFromCreds(orgid)
  .flatMapObservable(pairReader(readerid))
  .subscribe(state -> {
     switch(state) {
       case BEGUN:
         LOG.d(TAG, "Pairing begun");
         break;
       case PAIRED:
         LOG.d(TAG, "Pairing success");
         callback.success();
         break;
       case NOTIFIED_SERVER:
         LOG.d(TAG, "Pairing server notified");
         break;
     }},
     error -> {
       Crashlytics.logException(error);
       callback.error(error.getLocalizedMessage());
     });

最佳答案

如果源流发出多个项目,则 singleOrError() 应该发出错误。 Doc

对于您的情况,请改用 first()firstOrError()

  Single<Organization> getOrgFromCreds(String orgid) {
    return getCredentials()
      .map(DeviceCredential::getOrganization)
      .filter(org -> org.getId().equals(orgid))
      .firstOrError();
  }

关于java - 将 Single 应用于 ObservableSource 并且不要过度读取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56777230/

相关文章:

java - RichFaces 列 : Saving value of attribute on a row for comparisons

java - Ng 文件上传 - url JAVA SPRING

javascript - 有条件地组合两个可观察量

javascript - 如何: request the url and zip it with response in rxjs?

android - DiffUtil 和 registerAdapterDataObserver

java - 带有空可观察值列表的 RxJava zip

java - 哪种设计模式允许基于运行时类型抽象功能?

java - 在android中顺序上传文件更安全/更好还是用多个线程同时上传文件更好?

java - 使用 rxjava 检测文件文本更改

rx-java - Retrofit2+RxJava2,无效 token ,retryWhen()重新订阅时如何更新流