看看这两个小测试:
@Test
public void test1() {
Observable.range(1, 10)
.groupBy(v -> v % 2 == 0)
.flatMap(group -> {
if (group.getKey()) {
return group;
}
return group;
})
.subscribe(System.out::println);
}
@Test
public void test2() {
Observable.range(1, 10)
.groupBy(v -> v % 2 == 0)
.toMap(g -> g.getKey())
.flatMapObservable(m -> Observable.merge(
m.get(true),
m.get(false)))
.subscribe(System.out::println);
}
我期望两者都以相同的顺序返回数字列表,所以:
1 2 3 4 5 6 7 8 9 10
但是第二个例子返回
2 4 6 8 10 1 3 5 7 9
相反。
似乎在第二个示例中,merge
执行的是 concat
,事实上,如果我将其更改为 concat
,结果是一样的。
我错过了什么?
谢谢。
最佳答案
基本上flatMap
和 merge
不保证发出的项目的顺序。
来自 flatMap文档:
Note that FlatMap merges the emissions of these Observables, so that they may interleave.
来自 merge文档:
Merge may interleave the items emitted by the merged Observables (a similar operator, Concat, does not interleave items, but emits all of each source Observable’s items in turn before beginning to emit items from the next source Observable).
引自此SO Answer :
In your case, with single-element, static streams, it is not making any real difference (but in theory, merge could output words in random order and still be valid according to spec)
如果您需要保证订单,请使用 concat*
相反。
第一个例子
它的工作原理是这样的:
- 当
1
发出groupBy
运算符(operator)将创建一个GroupedObservable
用 keyfalse
-
flatMap
将从这个可观察对象中输出项目——目前只有1
-
- 当
2
发出groupBy
运算符(operator)将创建一个GroupedObservable
用 keytrue
-
flatMap
现在还将输出第二个GroupedObservable
中的项目- 当前为2
-
- 当
3
发出groupBy
运算符(operator)会将其添加到现有的GroupedObservable
用 keyfalse
和flatMap
将立即输出此项目 - 当
4
发出groupBy
运算符(operator)会将其添加到现有的GroupedObservable
用 keytrue
和flatMap
将立即输出此项目 - 等等
它可能会帮助您添加更多日志记录:
Observable.range(1, 10)
.groupBy(v -> v % 2 == 0)
.doOnNext(group -> System.out.println("key: " + group.getKey()))
.flatMap(group -> {
if (group.getKey()) {
return group;
}
return group;
})
.subscribe(System.out::println);
那么输出是:
key: false
1
key: true
2
3
...
第二个例子
这是完全不同的,因为toMap
将阻塞直到上游完成:
- 当
1
发出groupBy
运算符(operator)将创建一个GroupedObservable
用 keyfalse
-
toMap
将添加此GroupedObservable
到内部映射并使用键false
(与GroupedObservable
具有相同的 key )
-
- 当
2
发出groupBy
运算符(operator)将创建一个GroupedObservable
用 keytrue
-
toMap
将添加此GroupedObservable
到内部映射并使用键true
(与GroupedObservable
具有相同的键) - 所以现在 map 有 2GroupedObservables
-
- 将以下数字添加到相应的
GroupedObservables
当源完成时,toMap
运算符(operator)完成并将 map 传递给下一个运算符(operator) - 在
flatMapObservable
您使用 map 创建一个新的可观察对象,首先添加偶数元素 (key =true
),然后添加奇数元素 (key =false
)
您还可以在此处添加更多日志记录:
Observable.range(1, 10)
.groupBy(v -> v % 2 == 0)
.doOnNext(group -> System.out.println("key: " + group.getKey()))
.toMap(g -> g.getKey())
.doOnSuccess(map -> System.out.println("map: " + map.size()))
.flatMapObservable(m -> Observable.merge(
m.get(true),
m.get(false)
))
.subscribe(System.out::println);
那么输出是:
key: false
key: true
map: 2
2
4
6
8
10
1
3
5
7
9
关于java - RxJava : merge() changes the order of the emitted items?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51440018/