java - onErrorResume 运算符会忽略 flatMap 中抛出的异常

标签 java spring-webflux project-reactor

考虑以下代码:

@Slf4j
@ExtendWith(MockitoExtension.class)
class ConnectionEventsConsumerTest {

    @Test
    public void testOnErrorResume() {
        Flux.range(0, 5)
                .doOnNext(event -> log.info("Processing -  {}", event))
                .flatMap(event -> processEvent(event)
                        .doOnSuccess(result -> log.info("Processed - {}", event))
                        .onErrorResume(t -> handleError(t, event))
                )
                .doOnError(t -> log.error("Exception propagated", t))
                //.log()
                .then()
                .subscribe();
    }

    private Mono<Void> processEvent(Object object) {
        return Mono.error(() -> new RuntimeException("test"));
        //throw new RuntimeException("test");
    }
    
    private Mono<Void> handleError(Throwable throwable, Object object) {
        log.error("Processing Failed - {}", object);
        
        return Mono.empty();
    }
    
}

如果方法 processEvent 返回 Mono.error 与抛出异常,则输出完全不同。

代码原样(返回 Mono.error),我看到了我所期望的,300 次迭代的处理和处理失败,并且我没有看到任何异常传播。

17:33:19.853 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest

  • Processing - 0 17:33:19.864 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • Processing Failed - 0 17:33:19.865 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • Processing - 1 17:33:19.866 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • Processing Failed - 1 17:33:19.866 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • Processing - 2 17:33:19.866 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • Processing Failed - 2 17:33:19.866 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • Processing - 3 17:33:19.866 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • Processing Failed - 3 17:33:19.866 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • Processing - 4 17:33:19.866 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • Processing Failed - 4

另一方面,如果我取消注释该抛出,我会看到正在处理的 Flux 中的单个项目,我看不到来自 handleError 的消息,并且看到“异常传播”

17:35:53.950 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest

  • Processing - 0 17:35:53.968 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • Exception propagated java.lang.RuntimeException: test

如果这是设计使然,那么 flatMap 的最佳实践是什么?我想到的一个简单的解决方案是用 try-catch 包围 flatMap 的内容,将异常包装在 Mono.error 中。虽然它有效,但它不优雅且过于手动,可能会被遗忘。

最佳答案

创建/返回Mono的方法不应以这种方式抛出异常。由于异常是在 Mono 组装(创建)之前抛出的,因此 flatMap 内的后续运算符不可能生效,因为它们需要 Mono > 进行操作。

如果您无法控制 processEvent() 方法来修复其行为,那么您可以使用 Mono.defer 包装它,这将确保即使引发错误在组装期间将通过 flatMap 内的 Mono 传播:

Flux.range(0, 5)
    .doOnNext(event -> log.info("Processing -  {}", event))
    .flatMap(event -> Mono.defer(() -> processEvent(event))
                .doOnSuccess(result -> log.info("Processed - {}", event))
                .onErrorResume(t -> handleError(t, event)))
    .doOnError(t -> log.error("Exception propagated", t))


private Mono<Void> processEvent(Object object) {
    throw new RuntimeException("test");
}

请注意,在其他中间运算符(例如 mapdoOnNext)中,您可以自由地以丑陋的方式抛出异常,因为 Reactor 可以将它们转换为正确的错误信号,因为此时Mono 已经在进行中。

关于java - onErrorResume 运算符会忽略 flatMap 中抛出的异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68945603/

相关文章:

java - 使用 Spring WebFlux 测试时,DTO 的内部列表为空

java - 并发 hashmap 同时插入

java - 取消使用 WebFlux 创建的 SSE 流的正确方法

java - 像这样重新分配reactor Publisher Mono有什么问题吗?

Spring-Boot WebClient block() 方法返回错误 java.lang.IllegalStateException

Spring 5 Web Reactive - Web 客户端 - 在响应流上使用 flatmap()

java - 看起来 "imperative"支持响应式(Reactive)代码的代码是否常见?

java - 创建一个空的哈希集

java - 如何使用 JScrollpane 列表获取 Jlist 以显示在 JFrame 上?没有出现

java - 在java中初始化final字段