当多个onErrorContinue时添加到管道以处理从 flatMap 抛出的特定类型的异常,异常处理未按预期工作。
下面的代码,我预计,元素 1 到 6 应该被删除,元素 7 到 10 应该被订阅者使用。
public class FlatMapOnErrorContinueExample {
public static void main(String[] args) {
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.flatMap(number -> {
if (number <= 3) {
return Mono.error(new NumberLesserThanThree("Number is lesser than 3"));
} else if (number > 3 && number <= 6) {
return Mono.error(new NumberLesserThanSixButGretherThan3("Number is grether than 6"));
} else {
return Mono.just(number);
}
})
.onErrorContinue(NumberLesserThanThree.class,
(throwable, object) -> System.err.println("Exception: Dropping the element because it is lesser than 3"))
.onErrorContinue(NumberLesserThanSixButGretherThan3.class,
(throwable, object) -> System.err.println("Exception: Dropping the element because it is lesser than 6 but grether than 3"))
.onErrorContinue((throwable, object) ->
System.err.println("Exception: " + throwable.getMessage()))
.subscribe(number -> System.out.println("number is " + number),
error -> System.err.println("Exception in Subscription " + error.getMessage()));
}
public static class NumberLesserThanThree extends RuntimeException {
public NumberLesserThanThree(final String msg) {
super(msg);
}
}
public static class NumberLesserThanSixButGretherThan3 extends RuntimeException {
public NumberLesserThanSixButGretherThan3(final String msg) {
super(msg);
}
}
}
这是我得到的输出:
Exception: Dropping the element because it is lesser than 3
Exception: Dropping the element because it is lesser than 3
Exception: Dropping the element because it is lesser than 3
Exception in Subscription Number is grether than 6
问题:为什么第二个onErrorContinue
没有被调用,但异常发送给订阅者?
附加说明:
如果我删除第一个和第二个 onErrorContinue
,那么所有异常都由第三个 onErrorContinue
处理。我可以使用这种方法来接收所有异常并检查异常类型并继续处理。但是,我想让它更干净的异常处理,而不是添加 if..else
block 。
这个问题与 Why does Thread.sleep() trigger the subscription to Flux.interval()? 有什么不同
1)这个关于异常处理和异常处理顺序的问题;另一个问题是关于并行处理元素并使主线程等待所有元素处理完成
3)这个问题不关心线程,即使在后面添加
,行为没有变化。 Thread.sleep(10000)
。订阅
最佳答案
这再次归结为 onErrorContinue
的异常行为。它打破了规则,因为它不会“捕获”错误,然后因此改变下游的行为,它实际上允许支持运算符(operator)“向前看”并采取相应的行为,从而改变上游的结果 em>.
这很奇怪,并会导致一些并不立即明显的行为,例如这里的情况。据我所知,所有支持的运算符仅向前查找 next onErrorContinue
运算符,而不是递归地向前搜索所有此类运算符。相反,它们将评估下一个 onErrorContinue
的谓词(在本例中,无论它是否属于某种类型),然后采取相应的行为 - 如果谓词返回 true,则调用处理程序,或者向下游抛出错误如果不。 (在任何情况下,它都不会移至下一个onErrorContinue
运算符,然后移至下一个,直到谓词匹配为止。)
显然这是一个人为的示例 - 但由于这些特性,我几乎总是建议避免 onErrorContinue
。当涉及到 flatMap()
时,可能会出现两种“正常”的情况:
如果
flatMap()
中有一个“内部 react 链”,即它调用另一个方法或一系列返回发布者的方法 - 那么只需使用onErrorResume( )
在flatMap()
调用末尾来处理这些错误。您可以链接onErrorResume()
,因为它适用于下游操作符,而不是上游操作符。这是迄今为止最常见的情况。如果
flatMap()
是 if/else 的命令式集合,它返回不同的发布者(例如这里的发布者),并且您想要/必须保持命令式风格,则抛出异常而不是使用Mono.error()
,并根据需要进行捕获,在发生错误时返回Mono.empty()
:
.flatMap(number -> {
try {
if (number <= 3) {
throw new NumberLessThanThree();
} else if (number <= 6) {
throw new NumberLessThanSixButGreaterThan3();
} else {
return Mono.just(number);
}
}
catch(NumberLessThanThree ex) {
//Handle it
return Mono.empty();
}
catch(NumberLessThanSixButGreaterThan3 ex) {
//As above
}
})
一般来说,使用这两种方法中的一种可以更容易地推理正在发生的事情。
(为了阅读评论后的完整性 - 这与 react 链在主线程退出之前无法完成没有任何关系。)
关于project-reactor - Project Reactor + flatMap + Multiple onErrorComplete - 未按预期工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62207580/