java - RxJava : merge() changes the order of the emitted items?

标签 java functional-programming rx-java reactive-programming rx-java2

看看这两个小测试:

@Test
public void test1() {
    Observable.range(1, 10)
        .groupBy(v -> v % 2 == 0)
        .flatMap(group -> {
            if (group.getKey()) {
                return group;
            }
            return group;
        })
        .subscribe(System.out::println);
}

@Test
public void test2() {
    Observable.range(1, 10)
        .groupBy(v -> v % 2 == 0)
        .toMap(g -> g.getKey())
        .flatMapObservable(m -> Observable.merge(
            m.get(true),
            m.get(false)))
        .subscribe(System.out::println);
}

我期望两者都以相同的顺序返回数字列表,所以:

1 2 3 4 5 6 7 8 9 10

但是第二个例子返回

2 4 6 8 10 1 3 5 7 9

相反。

似乎在第二个示例中,merge 执行的是 concat,事实上,如果我将其更改为 concat,结果是一样的。

我错过了什么?

谢谢。

最佳答案

基本上flatMapmerge不保证发出的项目的顺序。

来自 flatMap文档:

Note that FlatMap merges the emissions of these Observables, so that they may interleave.

来自 merge文档:

Merge may interleave the items emitted by the merged Observables (a similar operator, Concat, does not interleave items, but emits all of each source Observable’s items in turn before beginning to emit items from the next source Observable).

引自此SO Answer :

In your case, with single-element, static streams, it is not making any real difference (but in theory, merge could output words in random order and still be valid according to spec)

如果您需要保证订单,请使用 concat*相反。

第一个例子

它的工作原理是这样的:

  • 1发出 groupBy运算符(operator)将创建一个 GroupedObservable用 key false
    • flatMap将从这个可观察对象中输出项目——目前只有 1
  • 2发出 groupBy运算符(operator)将创建一个 GroupedObservable用 key true
    • flatMap现在还将输出第二个 GroupedObservable 中的项目- 当前为 2
  • 3发出 groupBy运算符(operator)会将其添加到现有的 GroupedObservable用 key falseflatMap将立即输出此项目
  • 4发出 groupBy运算符(operator)会将其添加到现有的 GroupedObservable用 key trueflatMap将立即输出此项目
  • 等等

它可能会帮助您添加更多日志记录:

    Observable.range(1, 10)
            .groupBy(v -> v % 2 == 0)
            .doOnNext(group -> System.out.println("key: " + group.getKey()))
            .flatMap(group -> {
                if (group.getKey()) {
                    return group;
                }
                return group;
            })
            .subscribe(System.out::println);

那么输出是:

key: false
1
key: true
2
3
...

第二个例子

这是完全不同的,因为toMap将阻塞直到上游完成:

  • 1发出 groupBy运算符(operator)将创建一个 GroupedObservable用 key false
    • toMap将添加此 GroupedObservable到内部映射并使用键 false (与 GroupedObservable 具有相同的 key )
  • 2发出 groupBy运算符(operator)将创建一个 GroupedObservable用 key true
    • toMap将添加此 GroupedObservable到内部映射并使用键 true (与 GroupedObservable 具有相同的键) - 所以现在 map 有 2 GroupedObservables
  • 将以下数字添加到相应的GroupedObservables当源完成时,toMap运算符(operator)完成并将 map 传递给下一个运算符(operator)
  • flatMapObservable您使用 map 创建一个新的可观察对象,首先添加偶数元素 (key = true),然后添加奇数元素 (key = false)

您还可以在此处添加更多日志记录:

    Observable.range(1, 10)
            .groupBy(v -> v % 2 == 0)
            .doOnNext(group -> System.out.println("key: " + group.getKey()))
            .toMap(g -> g.getKey())
            .doOnSuccess(map -> System.out.println("map: " + map.size()))
            .flatMapObservable(m -> Observable.merge(
                    m.get(true),
                    m.get(false)
            ))
            .subscribe(System.out::println);

那么输出是:

key: false
key: true
map: 2
2
4
6
8
10
1
3
5
7
9

关于java - RxJava : merge() changes the order of the emitted items?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51440018/

相关文章:

java - volatile ,在 x86 处理器上没有用处

java - Keytool 生成版本 3 证书

clojure - 为什么我的传感器功能比使用 ->> 运算符慢?

javascript - 在 RxJS 中制作打字计时器;跟踪打字时间

android - RxJava android - 是否观察者 block

java - 在 Appium Java Android 测试中单击菜单项时页面源未更新

java - Java : S to Start and Q to Quit中的错误检查

javascript - 为什么 JavaScript 中的不变性如此重要(或需要)?

java - 如何在 Java 中实现列表折叠

java - 如果 RxJava observable 需要很长时间,你如何显示微调器?