总的来说,我对 RX 很陌生,特别是 rxjava,请原谅我的错误。
此操作依赖于两个异步操作。
第一个使用过滤器函数尝试从异步 Observable 返回的列表中获取单个实体。
第二个是与设备通信并生成状态更新的 Observable 的异步操作。
我想获取从过滤器函数创建的 Single,将其应用于 pairReader(...)
,并订阅其 Observable 进行更新。我可以让它按所示方式工作,但前提是我包含注释的 take(1)
,否则我会收到异常,因为链尝试从 Single 中提取另一个值。
Observable<DeviceCredential> getCredentials() {
return deviceCredentialService()
.getCredentials()
.flatMapIterable(event -> event.getData());
}
Single<Organization> getOrgFromCreds(String orgid) {
return getCredentials()
// A device is logically constrained to only have a single cred per org
.map(DeviceCredential::getOrganization)
.filter(org -> org.getId().equals(orgid))
.take(1) // Without this I get an exception
.singleOrError();
}
Function<Organization, Observable<Reader.EnrollmentState>> pairReader(String name) {
return org -> readerService().pair(name, org);
}
getOrgFromCreds(orgid)
.flatMapObservable(pairReader(readerid))
.subscribe(state -> {
switch(state) {
case BEGUN:
LOG.d(TAG, "Pairing begun");
break;
case PAIRED:
LOG.d(TAG, "Pairing success");
callback.success();
break;
case NOTIFIED_SERVER:
LOG.d(TAG, "Pairing server notified");
break;
}},
error -> {
Crashlytics.logException(error);
callback.error(error.getLocalizedMessage());
});
最佳答案
如果源流发出多个项目,则 singleOrError()
应该发出错误。 Doc
对于您的情况,请改用 first()
或 firstOrError()
。
Single<Organization> getOrgFromCreds(String orgid) {
return getCredentials()
.map(DeviceCredential::getOrganization)
.filter(org -> org.getId().equals(orgid))
.firstOrError();
}
关于java - 将 Single 应用于 ObservableSource 并且不要过度读取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56777230/