我有一个场景,我将从 DB
获取实体列表使用
repository.getAllByIds(ids)
这将返回 Flux<Entity>
如果 Flux 为空,那么我需要调用 handleAllEntitiesNotFound()
否则我需要打电话handleNotFoundEntities()
repository.getAllByIds(ids)
.buffer()
.switchIfEmpty(__ -> handleAllEntitiesNotFound(ids, erroneousEntities))
.flatMap(list -> handleNotFoundEntities(list))
private Flux<Entity> handleAllEntitiesNotFound(List<String> ids, List<ResponseError> erroneousEntities) {
Flux.fromIterable(ids).subscribe(id -> erroneousEntities.add(new ResponseError("Not Found", "Not Found", id)));
return Flux.empty();
}
我正在使用buffer()
将列表收集到 Flux<List<Entity>>
问题是,当我调用该服务时,它会停止,没有响应,没有日志,没有任何内容,如果我删除了行 .switchIfEmpty(__ -> handleAllEntitiesNotFound(ids, erroneousEntities))
它可以工作并返回响应,但不处理 handleAllEntitiesNotFound
使用 buffer()
可能会出现什么问题与 switchIfEmpty()
最佳答案
我认为您在这里得出了错误的结论 - buffer()
和 switchIfEmpty()
一起工作没有问题:
Flux.empty()
.buffer()
.switchIfEmpty(Mono.just(List.of(1)))
.subscribe(System.out::println); //Prints "[1]"
但是,您的 handleAllEntitiesNotFound()
方法非常可疑。您似乎正在传递一个现有列表,创建一个新的 Flux 添加到其中,然后返回一个空 Flux。该示例无法运行,因此不可能缩小确切原因的范围,但有几个点很可能是罪魁祸首(无论是单独的还是同时出现的):
- 改变传递到 react 流中的现有对象通常被认为是不好的形式。返回一个新列表更加容易和安全(如果您愿意,您可以在 react 流完成时将该列表与另一个列表合并。)
- 您创建一个
Flux
只是为了从一个列表中读取数据,并将元素添加到另一个列表中。这很令人困惑,而且没有什么意义。只需使用标准 Java 流(即ids.stream().map(id -> new ResponseError("Not Found", "Not Found", id)).collect(Collectors.toList())
.) - 您正在返回
Flux.empty()
,这几乎可以肯定是没有响应的原因。人们通常期望switchIfEmpty()
返回一个非空 Flux,除非您故意将其用作副作用。 handleNotFoundEntities
似乎是一个奇怪的方法名称选择,该方法似乎会传递找到的实体。
关于java - Flux.buffer() 不适用于 switchIfEmpty,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58656644/