我正在尝试使用 rx-java 并行验证一个大文件,并在发生任何验证错误时立即返回。 我的代码如下所示:
Observable<ValidationError> observable = Observable.fromIterable(lines).flatMap(
line -> Observable
.just(line)
.subscribeOn(Schedulers.computation())
.map(lineParser::parse)
.map(lineValidator::validator) //validation returns Optional<ValidationError>
.filter(Optional::isPresent)
.map(Optional::get)
);
这会导致 Observable 在文件中出现任何错误,但我想知道如何才能使计算在第一次发生时停止。 我见过有一个blockingFirst 方法,它似乎会返回第一个错误,但当文件中没有发现错误时,它应该抛出异常,这会使我的代码变得丑陋。 有什么帮助吗?
最佳答案
我认为您正在尝试验证某些输入 lines
并且您的 validator 方法返回 Observable<ValidationError>
。当任何输入验证失败时,您需要得到一个错误,否则验证通过。
在这种情况下,您的 validator 方法可以返回 Completable
而不是Observable<ValidationError>
(您预计第一个错误发生的地方),这更有意义
一个Completable
合约有2个回调onComplete
和onError
。在您的情况下,如果所有输入都有效,它可以触发 onComplete
信号。
否则,如果任何一项失败,它可以停止那里的进程并触发 error
信号。
ValidationError
可以扩展可抛出
private class ValidationError extends Throwable {
}
您可以使用现有代码,稍作更改,如下所示
private Completable validateLines(List<String> lines) {
return Observable.fromIterable(lines)
.subscribeOn(Schedulers.computation())
.map(lineParser::parse)
.map(lineValidator::validator) //validation returns Optional<ValidationError>
.flatMapCompletable(optional -> optional.isPresent() ? Completable.error(new ValidationError()) : Completable.complete()); // Throw an error whenever a validation error is present , else continue
}
最后,执行验证
validateLines(lines)
.subscribeWith(new DisposableCompletableObserver() {
@Override
public void onComplete() {
// if validation succeeds , on Complete is triggered
}
@Override
public void onError(Throwable e) {
// if any error happens , onError triggered,
if(e instanceof ValidationError){
// a validation error has happened
}
}
})
关于java - 使用 rx-java2 : get first error 进行验证,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50986425/