java - 分散-聚集 : combine set of Mono<List<Item>> into single Mono<List<Item>>

标签 java spring reactive-programming java-9 spring-webflux

我可以合并 Mono<List<Item>> 的列表吗?数据源整合为单个Mono<List<Item>>包含所有项目而不阻塞?

在我的带有 Lombok 分散收集应用程序的 JDK 9 Spring Boot 2 中,此阻塞版本有效:

    private Mono<List<Item>> gather(List<Mono<List<Item>>> data) {
        return Mono.just( data.stream().map(m -> m.block())
                .flatMap(List::parallelStream).collect(Collectors.toList()));
    }

每个源数据流调用 block()Mono ;我想减少block()如果可能的话...最好调用为零。有什么想法吗?

测试用例

@RunWith(SpringRunner.class)
public class ReactiveTests {
    @Test
    public void testScatterGather() {
        List<List<Item>> dataSet = dataSet();
        Mono<List<Item>> data = gather(scatter(dataSet));
        StepVerifier.create(data)
            .expectNext(toItemList(dataSet))
            .expectComplete();
    }

    private Mono<List<Item>> gather(List<Mono<List<Item>>> data) {
        return Mono.just( data.stream().map(m -> m.block())
                .flatMap(List::parallelStream).collect(Collectors.toList()));
    }

    private List<Mono<List<Item>>> scatter(List<List<Item>> data) {
        return newMonoLists(data);
    }

    private List<Item> toItemList(List<List<Item>> data) {
        return data.stream().flatMap(List::stream).collect(Collectors.toList());
    }

    private List<Mono<List<Item>>> newMonoLists(List<List<Item>> data) {
        return data.stream().map(l -> Mono.just(l)).collect(Collectors.toList());
    }

    private List<List<Item>> dataSet() {
        return Arrays.asList(dataSet(1L),dataSet(4L),dataSet(7L));
    }

    private List<Item> dataSet(long id) {
        return Arrays.asList(new Item(id), new Item(id+1), new Item(id+2));
    }
    @Data @AllArgsConstructor private static class Item { private Long id; }
}

最佳答案

假设您有 2 个具有字符串列表的 Mono 源。

Mono<List<String>> listMono1 = Mono.just(Arrays.asList("1","3","5","7","9","11"));
Mono<List<String>> listMono2 = Mono.just(Arrays.asList("2","4","6","8","10","12"));

您可以使用方法 Flux.merge() 合并两个发布商它吸收了一批发布商并将它们合并在一起。

但是如果你像下面这样合并两个发布者,那么它最终会给你 List<List<String>> ,这是你不想要的。

Flux.merge(listMono1, listMono2).collectList();

因此,您必须修改 listMono1 和 listMono2,以便它们发出 List 的各个元素,然后合并它们。为此,您可以使用 Mono.flatMapMany()这将帮助您发出列表中的所有元素。所以,最终你会得到如下组合列表:

Flux.merge(listMono1.flatMapMany(Flux::fromIterable),listMono2.flatMapMany(Flux::fromIterable)).collectList();

希望最终是你想要的!

关于java - 分散-聚集 : combine set of Mono<List<Item>> into single Mono<List<Item>>,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48451509/

相关文章:

java - 如何在 Java 源文件中获取给定行号的周围方法

java - 为什么 java.lang.Long 的 .longValue() 将其 (long) 实例值转换为 long?

java - 是否可以在 Spring Controller 中拆分请求参数?

javascript - 在react.js中使用路由时,向 `component`提供的 Prop `Route`无效

Angular/RxJS - 有一个 RxJS 管道用于启动流吗?

java - Android 中的主页按钮覆盖。重启手机后启动问题

java - 当服务器抛出 IllegalArgrumentException 等应用程序异常时,预期响应是什么?

html - 我需要 Spring API CSS 文件

java - 从 RequestContext 或类似的访问 StreamListener header

java - 谓词过滤器 rxjava2 - 如何传递动态过滤器参数