现在在玩 RxJava,偶然发现了以下问题:
我有 2 个不同的流:
所以基本上我有项目流,我希望所有这些项目与第二个流中的单个项目组合:
----a1----a2----a3----a4----a5----|--------------->
-------------b1---|-------------------------------- -->
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
------------a1b1-a2b1-a3b1-a4b1-a5b1-------->
看起来真的很像
combileLatest
运算符,但 combineLatest 将忽略第一个流中的所有项目,除了最接近第二个流中的项目。这意味着我不会收到a1b1
- 发出的第一个结果项目将是 a2b1
.我也看了
delay
运算符,但它不允许我指定 close
流就像用 buffer
完成的一样运算符(operator)有没有什么花哨的运算符可以解决上述问题?
最佳答案
有几种方法可以实现这一点:
1) flatMap
在 b
如果您不需要启动a
预先
b.flatMap(bv -> a.map(av -> together(av, bv)));
2) 你当然可以,
cache
但它会保留您的 a
s 在整个流期间。3) 使用 groupBy 有点非常规,因为它的 GroupedObservable 缓存值直到单个订阅者到达,重放缓存的值并继续作为常规的直接 observable(让所有以前的缓存值消失)。
Observable<Long> source = Observable.timer(1000, 1000, TimeUnit.MILLISECONDS)
.doOnNext(v -> System.out.println("Tick"))
.take(10);
Observable<String> other = Observable.just("-b").delay(5000, TimeUnit.MILLISECONDS)
.doOnNext(v -> System.out.println("Tack"))
;
source.groupBy(v -> 1)
.flatMap(g ->
other.flatMap(b -> g.map(a -> a + b))
).toBlocking().forEach(System.out::println);
它的工作原理如下:
GroupedObservable
通过将源中的所有内容分组到组 1。g
到达,我们“开始观察”另一个可观察的。 a + b
的最终序列。 s。 我添加了 doOnNexts 以便您可以看到源在
other
之前确实处于事件状态发射它的“Tack”。
关于reactive-programming - 延迟项目发射,直到项目从另一个 observable 发射,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30227373/