java - 执行 Flux.map() 时如何处理错误

标签 java reactive-programming project-reactor

我试图弄清楚在 Flux 中映射元素时如何处理错误。

例如,我将一个 CSV 字符串解析为我的一个业务 POJO:

myflux.map(stock -> converter.convertHistoricalCSVToStockQuotation(stock));

其中一些行可能包含错误,所以我在日志中得到的是:

 reactor.core.publisher.FluxLog:  onNext([SOME_BOGUS_QUOTE]@38.09 (Fri Apr 08 00:00:00 CEST 2016) H(38.419998)/L(37.849998)/O(37.970001))
 reactor.core.publisher.FluxLog:  onNext([SOME_BOGUS_QUOTE]@38.130001 (Thu Apr 07 00:00:00 CEST 2016) H(38.189999)/L(37.610001)/O(37.799999))
 reactor.core.publisher.FluxLog:  onError(java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo)
 reactor.core.publisher.FluxLog:  java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo

我在 API 中阅读了一些错误处理方法,但大多数是指返回“错误值”或使用后备 Flux,例如:

Flux.onErrorResumeWith(myflux, x -> Mono.fromCallable(() -> ... do stuff);

但是,将它与我的 myflux 一起使用意味着再次处理整个通量。

那么,有没有办法在处理特定元素时处理错误(即忽略它们/记录它们)并继续处理其余的通量?

使用@akarnokd 解决方法更新

public Flux<StockQuotation> getQuotes(List<String> tickers)
{
    Flux<StockQuotation> processingFlux = Flux.fromIterable(tickers)
    // Get each set of quotes in a separate thread
    .flatMap(s -> Mono.fromCallable(() -> feeder.getCSVQuotes(s)))
    // Convert each list of raw quotes string in a new Flux<String>
    .flatMap(list -> Flux.fromIterable(list))
    // Convert the string to POJOs
    .flatMap(x -> {
            try {
                return Flux.just(converter.convertHistoricalCSVToStockQuotation(x));    
            }
            catch (IllegalArgumentException ex){
                System.out.println("Error decoding stock quotation: " + x);
                return Flux.empty();
            }
    });

    return processingFlux;
}

不过,这很有魅力,因为您可以看到代码没有以前那么优雅了。 Flux API 没有任何方法可以完成这段代码的工作吗?

retry(...)
retryWhen(...)
onErrorResumeWith(...)
onErrorReturn(...)

最佳答案

您需要 flatMap 来代替,如果处理失败,您可以返回一个空序列:

myflux.flatMap(v -> {
    try {
        return Flux.just(converter.convertHistoricalCSVToStockQuotation(stock));
    } catch (IllegalArgumentException ex) {
        return Flux.empty();
    }
});

关于java - 执行 Flux.map() 时如何处理错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36237230/

相关文章:

r - 动态着色的 slider 输入

swift - 如何在 Swift Joint 中创建自定义链?

cocoa - ReactiveCocoa 的引用所有权语义是什么?

java - 有没有办法合并多个单声道错误信号?

java - 使用 WebClient/Spring Boot 2 将 REST 响应映射到 Mono<SomeClass> 的正确方法

java - HttpURLConnection 响应代码返回 500

java - @ComponentScan 和 @Bean 在上下文配置中有什么区别?

java - C3P0 线程安全吗?

java - 在Linux中运行jnotify程序时引发异常

java - Flux 不等待 'then' 之前的元素完成