project-reactor - Project Reactor + flatMap + Multiple onErrorComplete - 未按预期工作

标签 project-reactor spring-reactor

当多个onErrorContinue时添加到管道以处理从 flatMap 抛出的特定类型的异常,异常处理未按预期工作。

下面的代码,我预计,元素 1 到 6 应该被删除,元素 7 到 10 应该被订阅者使用。

public class FlatMapOnErrorContinueExample {
    public static void main(String[] args) {
        Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                .flatMap(number -> {
                    if (number <= 3) {
                        return Mono.error(new NumberLesserThanThree("Number is lesser than 3"));
                    } else if (number > 3 && number <= 6) {
                        return Mono.error(new NumberLesserThanSixButGretherThan3("Number is grether than 6"));
                    } else {
                        return Mono.just(number);
                    }
                })
                .onErrorContinue(NumberLesserThanThree.class,
                        (throwable, object) -> System.err.println("Exception: Dropping the element because it is lesser than 3"))

                .onErrorContinue(NumberLesserThanSixButGretherThan3.class,
                        (throwable, object) -> System.err.println("Exception: Dropping the element because it is lesser than 6 but grether than 3"))

                .onErrorContinue((throwable, object) ->
                        System.err.println("Exception: " + throwable.getMessage()))

                .subscribe(number -> System.out.println("number is " + number),
                        error -> System.err.println("Exception in Subscription " + error.getMessage()));
    }

    public static class NumberLesserThanThree extends RuntimeException {
        public NumberLesserThanThree(final String msg) {
            super(msg);
        }
    }

    public static class NumberLesserThanSixButGretherThan3 extends RuntimeException {
        public NumberLesserThanSixButGretherThan3(final String msg) {
            super(msg);
        }
    }
}

这是我得到的输出:

Exception: Dropping the element because it is lesser than 3
Exception: Dropping the element because it is lesser than 3
Exception: Dropping the element because it is lesser than 3
Exception in Subscription Number is grether than 6

问题:为什么第二个onErrorContinue没有被调用,但异常发送给订阅者?

附加说明: 如果我删除第一个和第二个 onErrorContinue,那么所有异常都由第三个 onErrorContinue 处理。我可以使用这种方法来接收所有异常并检查异常类型并继续处理。但是,我想让它更干净的异常处理,而不是添加 if..else block 。

这个问题与 Why does Thread.sleep() trigger the subscription to Flux.interval()? 有什么不同

1)这个关于异常处理和异常处理顺序的问题;另一个问题是关于并行处理元素并使主线程等待所有元素处理完成 3)这个问题不关心线程,即使在后面添加Thread.sleep(10000)。订阅,行为没有变化。

最佳答案

这再次归结为 onErrorContinue 的异常行为。它打破了规则,因为它不会“捕获”错误,然后因此改变下游的行为,它实际上允许支持运算符(operator)“向前看”并采取相应的行为,从而改变上游的结果 em>.

这很奇怪,并会导致一些并不立即明显的行为,例如这里的情况。据我所知,所有支持的运算符仅向前查找 next onErrorContinue 运算符,而不是递归地向前搜索所有此类运算符。相反,它们将评估下一个 onErrorContinue 的谓词(在本例中,无论它是否属于某种类型),然后采取相应的行为 - 如果谓词返回 true,则调用处理程序,或者向下游抛出错误如果不。 (在任何情况下,它都不会移至下一个onErrorContinue 运算符,然后移至下一个,直到谓词匹配为止。)

显然这是一个人为的示例 - 但由于这些特性,我几乎总是建议避免 onErrorContinue。当涉及到 flatMap() 时,可能会出现两种“正常”的情况:

  1. 如果 flatMap() 中有一个“内部 react 链”,即它调用另一个方法或一系列返回发布者的方法 - 那么只需使用 onErrorResume( )flatMap() 调用末尾来处理这些错误。您可以链接onErrorResume(),因为它适用于下游操作符,而不是上游操作符。这是迄今为止最常见的情况。

  2. 如果 flatMap() 是 if/else 的命令式集合,它返回不同的发布者(例如这里的发布者),并且您想要/必须保持命令式风格,则抛出异常而不是使用 Mono.error(),并根据需要进行捕获,在发生错误时返回 Mono.empty():

    .flatMap(number -> {
        try {
            if (number <= 3) {
                throw new NumberLessThanThree();
            } else if (number <= 6) {
                throw new NumberLessThanSixButGreaterThan3();
            } else {
                return Mono.just(number);
            }
        }
        catch(NumberLessThanThree ex) {
            //Handle it
            return Mono.empty();
        }
        catch(NumberLessThanSixButGreaterThan3 ex) {
            //As above
        }
    })

一般来说,使用这两种方法中的一种可以更容易地推理正在发生的事情。

(为了阅读评论后的完整性 - 这与 react 链在主线程退出之前无法完成没有任何关系。)

关于project-reactor - Project Reactor + flatMap + Multiple onErrorComplete - 未按预期工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62207580/

相关文章:

spring-boot - Spring Boot + JWT Oauth2 : Spring 5 vs Spring <5

Spring Reactor Webflux 调度器并行性

java - Spring Gateway AsyncPredicate 不适用于 react 堆和通量

spring - 如何在非响应式(Reactive) Spring EventListener 和响应式(Reactive) Flux 之间架起桥梁

java - Project Reactor 的 flatMap 中关于线程的混淆

java - 在 mono.compose() 内部添加 doOnSuccess() 与简单的 mono.doOnSuccess() 有什么好处

java - 使用 Spring WebClient 发出多个请求

java - 我如何等待多个单声道一次完成并获得值(value)

java - 如何在没有嵌套订阅的情况下组合/链接包含不同数据类型的多个 Mono/Flux

spring-webflux - Mono flatMap + switchIfEmpty 组合运算符?