rx-java - RxJava : how to emulate withLatestFrom?

标签 rx-java

根据文档,withLatestFrom Java 实现中缺少该内容(与 combineLatest 略有不同)。关于如何模仿有什么想法吗? enter image description here

最佳答案

a 作为主要可观察量,将 b 作为“最新”可观察量,这个伪 java8 lambda 应该执行您想要的操作:

a.publish(a' -> b.switchMap(y -> a'.map(x -> x + y)))

这首先将 a 发布为 a',这允许重复订阅它,而无需重新启动流。然后,每次从 b 发出一个新项目时,它都会重新订阅 a 的持续流,该流将 b 的最新输出与a 的每个输出。

您可以轻松地将其包装为 RxJava 的 Transformer 的实现,如下所示(也是半伪的,因此请检查我的语法):

 public class WithLatestFrom<T, U, V> implements Transformer<T, V> {
   private final Func2<T, U, V> function;
   private final Observable<U> latest;

   private WithLatestFrom<T, U, V>(final Observable<U> latest, Func2<T, U, V> function) {
     this.function = function;
     this.latest = latest;
   }

   public static <T, U, V> WithLatestFrom<T, U, V> with(
       final Observable<U> latest, Func2<T, U, V> function) {
     return new WithLatestFrom<T, U, V>(latest, function);
   }

   @Override
   public Observable<V> call(final Observable<T> source) {
     return source.publish((publishedSource) -> latest.switchMap((y) ->
         publishedSource.map((x) -> function.call(x, y)));
   }

}

然后您可以在代码中重用它,例如:

a.compose(WithLatestFrom.with(b, (x, y) -> x + y));

关于rx-java - RxJava : how to emulate withLatestFrom?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28580490/

相关文章:

android - RXJava - 制作一个可暂停的可观察对象(例如使用缓冲区和窗口)

java - RxJava 调度程序不会通过 sleep 更改线程

java - 可观察超时

java - Rx Observables : emit additional item for each original item, 将它们减少为另一种类型,消耗

java - 如何计数并阻止在另一个线程上执行的 Observable?

java - RxJava,如果存在则获取第一项

java-8 - 如何从 2 个不同的 RxJava Observables 获取 2 个值?

Observable<T>.subscribe 上的 Kotlin 扩展函数不起作用

android - 在 android 上创建一个可观察的发射器

java - 将 io.projectreactor 版本从 2.0.x 升级到 3.0.4 - 使用 Spring 框架