我正在使用 io.projectreactor 3 (reactor-core 3.2.6.RELEASE),我注意到在错误处理方面存在一些差异。不幸的是,官方文档没有提供足够的细节来解决我的问题。
我有以下 4 个片段。在某些情况下,异常将被忽略,而在其他情况下,它将进一步抛出。真正产生和消费异常的方式是什么?
片段 1
在这种情况下,异常将被忽略并且 main() 将完成而不会接收到异常。
import reactor.core.publisher.Flux;
class Scratch {
public static void main(String[] args) throws Throwable {
Flux.push(sink -> {
sink.next(1);
sink.next(2);
}).doOnNext(e -> {
throw new RuntimeException("HELLO WORLD");
}).subscribe(System.out::println, e -> {
throw new RuntimeException(e);
});
System.out.println("DONE");
}
}
输出:
DONE
片段 2
与上面的示例类似,只是我们不使用 Flux.push 而是使用 Flux.just。 Main() 将收到异常。
import reactor.core.publisher.Flux;
class Scratch {
public static void main(String[] args) throws Throwable {
Flux.just(
1
).doOnNext(e -> {
throw new RuntimeException("HELLO WORLD");
}).subscribe(System.out::println, e -> {
throw new RuntimeException(e);
});
System.out.println("DONE");
}
}
输出:
Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: HELLO WORLD
at Scratch.lambda$main$1(scratch_15.java:10)
...
片段 3
我们通过调用 sink.error 来发出异常信号。 Main() 不会收到异常。
import reactor.core.publisher.Flux;
class Scratch {
public static void main(String[] args) throws Throwable {
Flux.push(sink -> {
sink.next(1);
sink.next(2);
sink.error(new RuntimeException("HELLO WORLD"));
}).subscribe(System.out::println, e -> {
throw new RuntimeException(e);
});
System.out.println("DONE");
}
}
输出:
1
2
DONE
片段 4
我们直接抛出异常。 Main() 将收到异常。
import reactor.core.publisher.Flux;
class Scratch {
public static void main(String[] args) throws Throwable {
Flux.push(sink -> {
sink.next(1);
sink.next(2);
throw new RuntimeException("HELLO WORLD");
}).subscribe(System.out::println, e -> {
throw new RuntimeException(e);
});
System.out.println("DONE");
}
}
输出
1
2
Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: HELLO WORLD
at Scratch.lambda$main$1(scratch_15.java:10)
...
在使用 reactive-core 时处理异常的正确方法是什么?唯一可靠的方法似乎根本不使用错误回调,而是用 try/catch 包围 flux.subscribe。但在那种情况下,我总是收到 UnsupportedOperationException
而不是原始异常,然后我需要使用 Exceptions.isErrorCallbackNotImplemented
检查它是否来自 react ,提取嵌套异常然后抛出
这当然可以做到,但需要在我们使用 Flux 的每个地方都被订阅。这对我来说不太好。我在这里缺少什么?
最佳答案
在您所有的示例中,问题都是从 .subscribe(...)
重新抛出的错误处理 lambda。
如果你想在主 block 中抛出异常,使用block()
变种。
如果要测试错误是否在整个管道中传播,请使用 StepVerifier.create(pipeline).expectError(...).verify()
.
.subscribe
通常是为了获得“终端”状态的通知,而不是为了恢复或重新抛出错误(为此使用上游的 onError*
运算符)。
just
基于 -based 的示例似乎正确传播了异常,因为它们在订阅时不执行用户提供的代码,因此在 subscribe(Consumer<Throwable>)
期间没有 try/catch 到位 | .
push
, 比如 generate
/create
/defer
和 compose
,在订阅时执行一些用户定义的逻辑(Consumer<FluxSink>
)。他们防全Consumer
抛出异常并尝试传播它(作为 onError
信号)而不是直接抛出它。
但如果在 Consumer
中失败是在执行 sink
之一时引起的的方法,如果 subscriber
可能会出现问题重新抛出:我们进入一个递归,其中调用 sink 调用 sink。当我们检测到接收器的递归耗尽时,我们通过退出来防止这种无限情况。
这就是为什么 push
在 sink.next
之后触发错误的基于示例或在 sink.error
(示例 1 和 3)未能在主程序中产生异常:
-
Consumer
被应用 -
sink.next
被调用,下一个运算符创建异常 1,或sink.error
被称为 - 异常 1 到达
subscribe
并作为异常 2 重新抛出 - 这会使
Consumer.apply
短路, 异常 2 传递给sink.error
- sink 已经被调用所以我们 break out 以避免无限递归
- 从未见过异常(exception) 2
另一方面,在示例 4 中,我们不再处于调用接收器方法的中间,并且原始异常不会首先到达订阅者:
-
Consumer
被应用 - 直接抛出异常1
- 这会使
Consumer.apply
短路异常 1 传递给sink.error
- 传播到订阅
- 将其作为异常 2 重新抛出
- 异常2出现在main方法中
关于java - 从 projectreactor 使用 Flux 时生成和处理异常的正确方法是什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55024703/