project-reactor - 具有短路选项的 Flux.concatDelayError

标签 project-reactor

我使用 Flux.concatDelayError 是因为我想一一订阅多个 Monos,并且还想知道是否出现了故障。

但是,现在我还想在我的一个 Monos 完成时出现特定类型的错误时进行短路。 这容易吗?

最佳答案

使用 onErrorResume 运算符,您可以为每个 Mono 配置条件回退到 Mono.empty():

package com.example;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import static java.util.function.Predicate.not;

public class ReactorExample
{
    public static void main(String[] args)
    {
        Mono<String> mono = Mono.just("first").doOnNext(a -> System.out.println(a + " was called."));

        Mono<String> mono2 = Mono.<String>error(new RuntimeException("Not terminating error."))
                .onErrorResume(not(ShortCircuitingException.class::isInstance), e -> Mono.empty());

        Mono<String> mono3 = Mono.just("third").doOnNext(a -> System.out.println(a + " was called."));

        Mono<String> mono4 = Mono.<String>error(new ShortCircuitingException())
                                 .onErrorResume(not(ShortCircuitingException.class::isInstance), e -> Mono.empty());

        Mono<String> mono5 = Mono.just("fifth").doOnNext(a -> System.out.println(a + " was called."));

        Flux.concat(mono, mono2, mono3, mono4, mono5)
            .collectList()
            .block();
    }

    private static class ShortCircuitingException extends RuntimeException
    {
    }
}

输出:

first was called.
third was called.
Exception in thread "main" com.example.ReactorExample$ShortCircuitingException

关于project-reactor - 具有短路选项的 Flux.concatDelayError,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59329986/

相关文章:

java - 如何在响应式(Reactive) Java 中将新对象添加到现有流中?

project-reactor - Reactor Flux - 仅在完成时从 Publisher 发出

java - CompletableFuture then 异常后撰写

java - 如何从阻塞队列创建 react 器 Flux?

java - 如何将上一步中的 Mono<> 结果传递到下一个 doOnSuccess() 方法

java - 有条件地将 Mono 与 Flux 结合起来

spring - 在 bean 初始化期间使用响应式存储库 - 我需要 block() 吗?

project-reactor - 将多个 REST 调用与 Spring Webflux 结合使用

kotlin - 将 Spring Webflux Mono 转换为 Either,最好不阻塞?

java - 使用 Reactor 抛出异常的正确方法