java - 从 projectreactor 使用 Flux 时生成和处理异常的正确方法是什么

标签 java reactive-programming project-reactor

我正在使用 io.projectreactor 3 (reactor-core 3.2.6.RELEASE),我注意到在错误处理方面存在一些差异。不幸的是,官方文档没有提供足够的细节来解决我的问题。

我有以下 4 个片段。在某些情况下,异常将被忽略,而在其他情况下,它将进一步抛出。真正产生和消费异常的方式是什么?

片段 1

在这种情况下,异常将被忽略并且 main() 将完成而不会接收到异常。

import reactor.core.publisher.Flux;

class Scratch {
    public static void main(String[] args) throws Throwable {
        Flux.push(sink -> {
            sink.next(1);
            sink.next(2);
        }).doOnNext(e -> {
            throw new RuntimeException("HELLO WORLD");
        }).subscribe(System.out::println, e -> {
            throw new RuntimeException(e);
        });
        System.out.println("DONE");
    }
}

输出:

DONE

片段 2

与上面的示例类似,只是我们不使用 Flux.push 而是使用 Flux.just。 Main() 将收到异常。

import reactor.core.publisher.Flux;

class Scratch {
    public static void main(String[] args) throws Throwable {
        Flux.just(
                1
        ).doOnNext(e -> {
            throw new RuntimeException("HELLO WORLD");
        }).subscribe(System.out::println, e -> {
            throw new RuntimeException(e);
        });
        System.out.println("DONE");
    }
}

输出:

Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: HELLO WORLD
    at Scratch.lambda$main$1(scratch_15.java:10)
...

片段 3

我们通过调用 sink.error 来发出异常信号。 Main() 不会收到异常。

import reactor.core.publisher.Flux;

class Scratch {
    public static void main(String[] args) throws Throwable {
        Flux.push(sink -> {
            sink.next(1);
            sink.next(2);
            sink.error(new RuntimeException("HELLO WORLD"));
        }).subscribe(System.out::println, e -> {
            throw new RuntimeException(e);
        });
        System.out.println("DONE");
    }
}

输出:

1
2
DONE

片段 4

我们直接抛出异常。 Main() 将收到异常。

import reactor.core.publisher.Flux;

class Scratch {
    public static void main(String[] args) throws Throwable {
        Flux.push(sink -> {
            sink.next(1);
            sink.next(2);
            throw new RuntimeException("HELLO WORLD");
        }).subscribe(System.out::println, e -> {
            throw new RuntimeException(e);
        });
        System.out.println("DONE");
    }
}

输出

1
2
Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: HELLO WORLD
    at Scratch.lambda$main$1(scratch_15.java:10)
...

在使用 reactive-core 时处理异常的正确方法是什么?唯一可靠的方法似乎根本不使用错误回调,而是用 try/catch 包围 flux.subscribe。但在那种情况下,我总是收到 UnsupportedOperationException 而不是原始异常,然后我需要使用 Exceptions.isErrorCallbackNotImplemented 检查它是否来自 react ,提取嵌套异常然后抛出

这当然可以做到,但需要在我们使用 Flux 的每个地方都被订阅。这对我来说不太好。我在这里缺少什么?

最佳答案

在您所有的示例中,问题都是从 .subscribe(...) 重新抛出的错误处理 lambda。

如果你想在主 block 中抛出异常,使用block()变种。

如果要测试错误是否在整个管道中传播,请使用 StepVerifier.create(pipeline).expectError(...).verify() .

.subscribe通常是为了获得“终端”状态的通知,而不是为了恢复或重新抛出错误(为此使用上游的 onError* 运算符)。

just基于 -based 的示例似乎正确传播了异常,因为它们在订阅时不执行用户提供的代码,因此在 subscribe(Consumer<Throwable>) 期间没有 try/catch 到位 | .

push , 比如 generate/create/defercompose ,在订阅时执行一些用户定义的逻辑(Consumer<FluxSink>)。他们防全Consumer抛出异常并尝试传播它(作为 onError 信号)而不是直接抛出它。

但如果在 Consumer 中失败是在执行 sink 之一时引起的的方法,如果 subscriber 可能会出现问题重新抛出:我们进入一个递归,其中调用 sink 调用 sink。当我们检测到接收器的递归耗尽时,我们通过退出来防止这种无限情况。

这就是为什么 pushsink.next 之后触发错误的基于示例或在 sink.error (示例 1 和 3)未能在主程序中产生异常:

  1. Consumer被应用
  2. sink.next被调用,下一个运算符创建异常 1,或 sink.error被称为
  3. 异常 1 到达 subscribe并作为异常 2 重新抛出
  4. 这会使 Consumer.apply 短路, 异常 2 传递给 sink.error
  5. sink 已经被调用所以我们 break out 以避免无限递归
  6. 从未见过异常(exception) 2

另一方面,在示例 4 中,我们不再处于调用接收器方法的中间,并且原始异常不会首先到达订阅者:

  1. Consumer被应用
  2. 直接抛出异常1
  3. 这会使 Consumer.apply 短路异常 1 传递给 sink.error
  4. 传播到订阅
  5. 将其作为异常 2 重新抛出
  6. 异常2出现在main方法中

关于java - 从 projectreactor 使用 Flux 时生成和处理异常的正确方法是什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55024703/

相关文章:

tomcat - 为什么 tomcat 使用 Spring WebFlux 会达到 200+ 线程?

rx-java - 如何使用 RxJava 和 Kotlin 进行 groupBy 和收集?

javascript - 在其派生链中调用 Observable 的 onNext() 方法

spring-webflux - 当 Flux 为空时返回 404

java - 开始时无法移动组件

java - 打印出数据类型

java - 为什么 Double.MIN_VALUE 不是负数

java - ReactiveCrudRepository 与 R2dbcRepository

java - Spring-WebFlux Flux 因上下文而失败

java - android客户端无法收到python服务器的响应