我可以合并 Mono<List<Item>>
的列表吗?数据源整合为单个Mono<List<Item>>
包含所有项目而不阻塞?
在我的带有 Lombok 分散收集应用程序的 JDK 9 Spring Boot 2 中,此阻塞版本有效:
private Mono<List<Item>> gather(List<Mono<List<Item>>> data) {
return Mono.just( data.stream().map(m -> m.block())
.flatMap(List::parallelStream).collect(Collectors.toList()));
}
每个源数据流调用 block()
其 Mono
;我想减少block()
如果可能的话...最好调用为零。有什么想法吗?
测试用例
@RunWith(SpringRunner.class)
public class ReactiveTests {
@Test
public void testScatterGather() {
List<List<Item>> dataSet = dataSet();
Mono<List<Item>> data = gather(scatter(dataSet));
StepVerifier.create(data)
.expectNext(toItemList(dataSet))
.expectComplete();
}
private Mono<List<Item>> gather(List<Mono<List<Item>>> data) {
return Mono.just( data.stream().map(m -> m.block())
.flatMap(List::parallelStream).collect(Collectors.toList()));
}
private List<Mono<List<Item>>> scatter(List<List<Item>> data) {
return newMonoLists(data);
}
private List<Item> toItemList(List<List<Item>> data) {
return data.stream().flatMap(List::stream).collect(Collectors.toList());
}
private List<Mono<List<Item>>> newMonoLists(List<List<Item>> data) {
return data.stream().map(l -> Mono.just(l)).collect(Collectors.toList());
}
private List<List<Item>> dataSet() {
return Arrays.asList(dataSet(1L),dataSet(4L),dataSet(7L));
}
private List<Item> dataSet(long id) {
return Arrays.asList(new Item(id), new Item(id+1), new Item(id+2));
}
@Data @AllArgsConstructor private static class Item { private Long id; }
}
最佳答案
假设您有 2 个具有字符串列表的 Mono 源。
Mono<List<String>> listMono1 = Mono.just(Arrays.asList("1","3","5","7","9","11"));
Mono<List<String>> listMono2 = Mono.just(Arrays.asList("2","4","6","8","10","12"));
您可以使用方法 Flux.merge() 合并两个发布商它吸收了一批发布商并将它们合并在一起。
但是如果你像下面这样合并两个发布者,那么它最终会给你 List<List<String>>
,这是你不想要的。
Flux.merge(listMono1, listMono2).collectList();
因此,您必须修改 listMono1 和 listMono2,以便它们发出 List 的各个元素,然后合并它们。为此,您可以使用 Mono.flatMapMany()这将帮助您发出列表中的所有元素。所以,最终你会得到如下组合列表:
Flux.merge(listMono1.flatMapMany(Flux::fromIterable),listMono2.flatMapMany(Flux::fromIterable)).collectList();
希望最终是你想要的!
关于java - 分散-聚集 : combine set of Mono<List<Item>> into single Mono<List<Item>>,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48451509/