spring-boot - 为什么使用 switchIfEmpty 时项目reactor会无限期挂起?

标签 spring-boot reactive-programming spring-data-mongodb project-reactor

背景

我正在使用 Spring Boot 2.2.1、project-reactor 3.3.0 和 spring-data-mongodb 2.2.1,并且我正在尝试从多个查询加载数据。我的代码大致如下:

Flux.just("type1", "type2", "type3", "type4")
    .concatMap { type ->
        reactiveMongoOperations.find<Map<String, Any>>(BasicQuery("{'type': '$type'}"), "collectionName")
                                .doOnError { e ->
                                    log.error("Caught exception when reading from mongodb: ${e::class.simpleName} - ${e.message}", e)
                                }.switchIfEmpty {
                                    log.warn("Failed to find any documents of type $type")
                                    Mono.empty<Map<String, Any>>()
                                }
    } 
    .. // More operations here
    .subscribe()

问题是,如果reactiveMongoOperations.find(..)没有找到给定类型的任何文档(因此“无法找到任何$type类型的文档” 已记录)整个操作将无限期挂起。如果我删除 switchIfEmpty 子句,操作就会完成并且一切正常。

问题

  1. 如果添加 switchIfEmpty 操作,为什么整个操作会挂起?如果我使用 flatMap 而不是 concatMap 也没关系,它最终还是会挂起。
  2. 我应该如何记录没有找到特定查询的文档? IE。我想记录当 reactiveMongoOperations.find(..) 返回空 Flux 时未找到任何文档。

最佳答案

当从 Kotlin 重新编写 Java 代码时(按照 Thomas 在评论中的建议),我找到了答案!我假设我使用了 reactor-kotlin-extensions 库提供的 Kotlin reactor.kotlin.core.publisher.switchIfEmpty 扩展函数:

fun <T> Flux<T>.switchIfEmpty(s: () -> Publisher<T>): Flux<T> = this.switchIfEmpty(Flux.defer { s() })

这里的情况并非如此,因此我最终使用了 Flux 中定义的 switchIfEmpty 方法,定义如下:

public final Flux<T> switchIfEmpty(Publisher<? extends T> alternate)

为了使其在没有扩展功能的情况下工作,我可能应该这样做:

.. 
.switchIfEmpty { subscriber ->
    log.warn("Failed to find any documents of type $type")
    subscriber.onComplete()
}

我最初的解决方案不起作用,因为 Java 版本假定我创建一个Publisher(我确实这样做了)并且还调用一个在此发布者上运行(我没有)。在 Kotlin 中,如果您不需要 lambda 参数,那么它是可选的,这就是类型系统没有捕获这一点的原因。

这是 Kotlin 与 Java 互操作可能比较棘手的一种方式。

关于spring-boot - 为什么使用 switchIfEmpty 时项目reactor会无限期挂起?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58979586/

相关文章:

java - 如何通过spring boot在启动时配置 'dispatcherServlet'负载?

java - 从数据库中检索 Spring Boot 配置

spring - @DataMongoTest 正在创建一个空的 MongoTemplate

mongodb - 调用 mongo 存储库的保存方法时未调用 Mongo Date 自定义转换器

Spring webflux : how to send Mono<T> in response body with body inserters

java - 没有可用的 'org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder' 类型的合格 bean

java - 使用 RxJava (ReactiveX) 运行 Observable 需要多长时间?

java - Observable.just(doSomeLongStuff()) 在订阅 observable 之前运行 doSomeLongStuff()

java - 如何在Rxjava中按键合并两个Observable?

java - Spring 数据 mongodb : Text search for 'phrase OR words in phrase'