java - 在 java flux 中按对象属性分组

标签 java reactive-programming project-reactor

给定以下数据结构 DataFlux<Data>基于某些属性实现分组到一系列列表的惯用方法是什么:

import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;

class Scratch {
    private static class Data {
        private Integer key;
        private String value;

        public Data(Integer key, String value) {
            this.key = key;
            this.value = value;
        }

        public Integer getKey() {
            return key;
        }

        public String getValue() {
            return value;
        }

        public static Data of(Integer key, String value) {
            return new Data(key, value);
        }

        @Override
        public String toString() {
            return value;
        }
    }

    public static void main(String[] args) {
        Flux<Data> test = Flux.just(
                Data.of(1, "Hello"),
                Data.of(1, "world"),
                Data.of(2, "How"),
                Data.of(2, "are"),
                Data.of(2, "you"),
                Data.of(3, "Bye"));
        test.bufferUntil(new Predicate<Data>() {
            Integer prev = null;
            @Override
            public boolean test(Data next) {
                boolean collect = prev != null && !Objects.equals(prev, next.getKey());
                prev = next.getKey();
                return collect;
            }
        }, true).subscribe(e -> System.out.println(e.toString()));
    }
} 

输出:

[Hello, world]
[How, are, you]
[Bye]

我知道 Flux 上的 groupBy 函数,但这又给了我一个 Flux,而不是列表。我上面描述的当前解决方案有效,但感觉不是 100% 地道,因为我不得不使用匿名类而不是 lambda。我本可以在 lambda 之外使用 lambda 和 AtomicReference,但这也不是 100% 正确的。有什么建议吗?

最佳答案

您还可以使用 collectMultimap这让你有 Map<K, Collection<T> .在这种情况下 collectMultimap将返回:Mono<Map<Integer,Collection<Data>>> :

 test.collectMultimap( Data::getKey )
     .subscribe( dataByKey -> System.out.println( dataByKey.toString() ) );

输出:

{1=[Hello, world], 2=[How, are, you], 3=[Bye]}

关于java - 在 java flux 中按对象属性分组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52432797/

相关文章:

spring-boot - 使用 onErrorResume 处理使用 Reactor Kafka 发布到 Kafka 的有问题的有效负载

java - 如何使用 Maven 使可执行 jar 在一段时间内选择属性文件

java - UVA 11854 中的编译错误

java - 使用 WebFlux 阻止 I/O 操作

java - RxJava retryWhen 异常行为

kotlin - 如何使用reactor和R2dbc压缩嵌套列表

java - 从项目 react 器中的通量中采样除第一个元素之外的所有元素

java - 在for循环中跳过数字

java - 提供的证书和 key 作为 SSL 握手的一部分

haskell - 响应式(Reactive)香蕉:触发包含行为的最新值的事件