背景
我正在使用 Spring Boot 2.2.1、project-reactor 3.3.0 和 spring-data-mongodb 2.2.1,并且我正在尝试从多个查询加载数据。我的代码大致如下:
Flux.just("type1", "type2", "type3", "type4")
.concatMap { type ->
reactiveMongoOperations.find<Map<String, Any>>(BasicQuery("{'type': '$type'}"), "collectionName")
.doOnError { e ->
log.error("Caught exception when reading from mongodb: ${e::class.simpleName} - ${e.message}", e)
}.switchIfEmpty {
log.warn("Failed to find any documents of type $type")
Mono.empty<Map<String, Any>>()
}
}
.. // More operations here
.subscribe()
问题是,如果reactiveMongoOperations.find(..)
没有找到给定类型的任何文档(因此“无法找到任何$type类型的文档”
已记录)整个操作将无限期挂起。如果我删除 switchIfEmpty
子句,操作就会完成并且一切正常。
问题
- 如果添加
switchIfEmpty
操作,为什么整个操作会挂起?如果我使用flatMap
而不是concatMap
也没关系,它最终还是会挂起。 - 我应该如何记录没有找到特定查询的文档? IE。我想记录当
reactiveMongoOperations.find(..)
返回空Flux
时未找到任何文档。
最佳答案
当从 Kotlin 重新编写 Java 代码时(按照 Thomas 在评论中的建议),我找到了答案!我假设我使用了 reactor-kotlin-extensions
库提供的 Kotlin reactor.kotlin.core.publisher.switchIfEmpty
扩展函数:
fun <T> Flux<T>.switchIfEmpty(s: () -> Publisher<T>): Flux<T> = this.switchIfEmpty(Flux.defer { s() })
这里的情况并非如此,因此我最终使用了 Flux
中定义的 switchIfEmpty
方法,定义如下:
public final Flux<T> switchIfEmpty(Publisher<? extends T> alternate)
为了使其在没有扩展功能的情况下工作,我可能应该这样做:
..
.switchIfEmpty { subscriber ->
log.warn("Failed to find any documents of type $type")
subscriber.onComplete()
}
我最初的解决方案不起作用,因为 Java 版本假定我创建一个Publisher
(我确实这样做了)并且还调用一个在此发布者上运行(我没有)。在 Kotlin 中,如果您不需要 lambda 参数,那么它是可选的,这就是类型系统没有捕获这一点的原因。
这是 Kotlin 与 Java 互操作可能比较棘手的一种方式。
关于spring-boot - 为什么使用 switchIfEmpty 时项目reactor会无限期挂起?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58979586/