java - 通量: filtering exceptions

标签 java spring-webflux project-reactor

我尝试使用 onErrorContinue 过滤来自 Flux/Mono 的异常,但它永远不会被触发。

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;


class FluxOnErrorContinueTest {

    @Test
    public void fluxTest() {
        Flux<String> fluxOne = Flux.just("one")
                .onErrorContinue(IllegalArgumentException.class, (ex, value) -> System.out.println("fluxOne: " + ex.getMessage()));
        Flux<String> fluxTwo = Flux.<String>error(() -> new IllegalArgumentException("error"))
                .onErrorContinue(IllegalArgumentException.class, (ex, value) -> System.out.println("fluxTwo: " +ex.getMessage()));
        Flux<String> fluxThree = Flux.just("three", "four", "five")
                .onErrorContinue(IllegalArgumentException.class, (ex, value) -> System.out.println("fluxThree: " +ex.getMessage()));

        Flux<String> sourceFlux = fluxOne.concatWith(fluxTwo).concatWith(fluxThree);

        Flux<String> filteredFlux = sourceFlux
                .onErrorContinue(IllegalArgumentException.class, (ex, value) -> System.out.println("filteredFlux: " + ex.getMessage()));

        StepVerifier.create(filteredFlux)
                .expectNextCount(4)
                .verifyComplete();

    }
}

我希望在控制台上获得四个字符串和一个带有异常消息的日志,但我得到:

12:15:15.191 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework

java.lang.AssertionError: expectation "expectNextCount(4)" failed (expected: count = 4; actual: counted = 1; signal: onError(java.lang.IllegalArgumentException: error))

    at reactor.test.ErrorFormatter.assertionError(ErrorFormatter.java:105)
    at reactor.test.ErrorFormatter.failPrefix(ErrorFormatter.java:94)
    at reactor.test.ErrorFormatter.fail(ErrorFormatter.java:64)
    at reactor.test.ErrorFormatter.failOptional(ErrorFormatter.java:79)
    at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.checkCountMismatch(DefaultStepVerifierBuilder.java:1258)
    at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onSignalCount(DefaultStepVerifierBuilder.java:1489)
    at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onExpectation(DefaultStepVerifierBuilder.java:1341)
    at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onError(DefaultStepVerifierBuilder.java:1030)
    at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onError(FluxContextStart.java:117)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:1748)
    at reactor.core.publisher.Operators.error(Operators.java:181)
    at reactor.core.publisher.FluxErrorSupplied.subscribe(FluxErrorSupplied.java:61)
    at reactor.core.publisher.Flux.subscribe(Flux.java:7921)
    at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:207)
    at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
    at reactor.core.publisher.FluxContextStart.subscribe(FluxContextStart.java:49)
    at reactor.core.publisher.Flux.subscribe(Flux.java:7921)
    at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.verify(DefaultStepVerifierBuilder.java:801)
    at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.verify(DefaultStepVerifierBuilder.java:772)
    at reactor.test.DefaultStepVerifierBuilder.verifyComplete(DefaultStepVerifierBuilder.java:644)
    at com.myorg.FluxOnErrorContinueTest.fluxTest(FluxOnErrorContinueTest.java:21)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)
    at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)
    at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)
    at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:74)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
    Suppressed: java.lang.IllegalArgumentException: error
        at com.myorg.FluxOnErrorContinueTest.lambda$fluxTest$0(FluxOnErrorContinueTest.java:14)
        at reactor.core.publisher.FluxErrorSupplied.subscribe(FluxErrorSupplied.java:60)
        ... 48 more

如何过滤异常(相当于 try-catch)?

我还尝试了 Mono 和来自 WebClient 的响应。

在 Javadoc 中提到了“错误模式支持 javadoc 部分”,但我找不到该部分。有链接吗?

最佳答案

这里的问题是测试中的onErrorContinue应用于整个sourceFlux。也许,您需要的是使用 onErrorResumeconcatWith 的每个 Publisher 的故障中恢复。运算符:

    @Test
    public void fluxTest() {
        Function<Throwable, Publisher<String>> fallback = error -> {
            if (error instanceof IllegalArgumentException) {
                System.out.println(error.getMessage());
                return Flux.empty();
            } else {
                return Flux.error(error);
            }
        };

        Flux<String> sourceFlux = Flux.just("one")
                .concatWith(Flux.<String>error(() -> new IllegalArgumentException("error"))
                        .onErrorResume(fallback))
                .concatWith(Flux.just("three", "four", "five")
                        .onErrorResume(fallback));

        StepVerifier.create(sourceFlux)
                .expectNextCount(4)
                .verifyComplete();
    }

或者,如果您只需要处理IllegalArgumentException:

    @Test
    public void fluxTest() {
        Function<Throwable, Publisher<String>> fallback = error -> {
            System.out.println(error.getMessage());
            return Flux.empty();
        };

        Flux<String> sourceFlux = Flux.just("one")
                .concatWith(Flux.<String>error(() -> new IllegalArgumentException("error"))
                        .onErrorResume(IllegalArgumentException.class, fallback))
                .concatWith(Flux.just("three", "four", "five")
                        .onErrorResume(IllegalArgumentException.class, fallback));

        StepVerifier.create(sourceFlux)
                .expectNextCount(4)
                .verifyComplete();
    }

关于java - 通量: filtering exceptions,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58114888/

相关文章:

Spring Boot 2. 异步 API。 CompletableFuture 与 Reactive

java - Jsoup:select(div[class=rslt prod]) 在不应该的时候返回 null

java - GWT 项目编译不工作 - 没有错误,在 SUPERDEV 模式下完美运行

java - 为什么我的 SecurityWebFilterChain 没有被调用?

java - 在 Spring Web 客户端中发送请求参数

spring-boot - spring boot 2 + netty + servlet.context-path + 不工作

java - 将 2 个 8 位字节数组位置合并为一个 16 位整数

java - 访问已安装认证签名

java - 无法压缩通量并返回空值。我们如何丢弃传递到通量中的事件对?

mongodb - webflux 返回一个空对象列表