rx-java - Rx 等价于 COUNT 和 GROUP BY?

标签 rx-java reactive-programming

我有一个重复元素列表,比如:

Observable<String> source = Observable.just("A", "B", "A", "C", "C", "A");

我想按它们的值以及它们出现的次数对它们进行分组,因此输出将是成对的:
{"A", 3}, {"B", 1}, {"C", 2}

基本上相当于 SQL 语句,如 SELECT x, COUNT(1) GROUP BY x;
我只能对他们调用 groupBy :
source.groupBy(x -> x, x -> 1)

但这会将流转换为 GroupedObservables,我找不到如何继续使用它们的好示例。我试过 reduce() ,但这里不好,因为在 groupBy() 之后它想要减少 GroupedObservables,而不是每个组内的元素。

GroupedObservables 可以做到这一点吗?有没有其他方法可以达到预期的结果?

最佳答案

以下代码:

source.groupBy(val -> val)
    .flatMap(
        gr -> gr.count()
                .map(count -> new Pair<>(gr.getKey(), count)
    )
).subscribe(System.out::println);

会打印出来:
A=3
B=1
C=2

关于rx-java - Rx 等价于 COUNT 和 GROUP BY?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39501772/

相关文章:

java - 连接 "n"个观察者后 refCount : call underlying ConnectObservable. connect()

Android,RXJava,使用去抖动搜索

java - 在没有事件总线的情况下触发事件时自动从 observable 获取项目

java - Spring react : convert Flux<Wrapper<X>> to Flux<X>

spring-boot - 在错误情况下从 WebClient 获取响应主体的正确方法是什么?

spring - boundedElastic() 与 parallel() 调度程序之间的区别

f# 可观察的 fork 和副作用

android - 多个顺序 Observable,每个 Observable 之间有时间延迟

java - RxJava 仅检查超时的第一个响应项

flutter - 在Dart中关闭流后可以重新打开流吗