reactive-programming - 延迟项目发射,直到项目从另一个 observable 发射

标签 reactive-programming rx-java

现在在玩 RxJava,偶然发现了以下问题:

我有 2 个不同的流:

  • 带有项目的流
  • Stream(只有 1 个项目),它发出第一个流的转换信息。

  • 所以基本上我有项目流,我希望所有这些项目与第二个流中的单个项目组合:

    ----a1----a2----a3----a4----a5----|--------------->

    -------------b1---|-------------------------------- -->

    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

    ------------a1b1-a2b1-a3b1-a4b1-a5b1-------->

    看起来真的很像 combileLatest 运算符,但 combineLatest 将忽略第一个流中的所有项目,除了最接近第二个流中的项目。这意味着我不会收到a1b1 - 发出的第一个结果项目将是 a2b1 .

    我也看了 delay 运算符,但它不允许我指定 close流就像用 buffer 完成的一样运算符(operator)

    有没有什么花哨的运算符可以解决上述问题?

    最佳答案

    有几种方法可以实现这一点:

    1) flatMapb如果您不需要启动a预先

    b.flatMap(bv -> a.map(av -> together(av, bv)));
    

    2) 你当然可以,cache但它会保留您的 a s 在整个流期间。

    3) 使用 groupBy 有点非常规,因为它的 GroupedObservable 缓存值直到单个订阅者到达,重放缓存的值并继续作为常规的直接 observable(让所有以前的缓存值消失)。
    Observable<Long> source = Observable.timer(1000, 1000, TimeUnit.MILLISECONDS)
            .doOnNext(v -> System.out.println("Tick"))
            .take(10);
    Observable<String> other = Observable.just("-b").delay(5000, TimeUnit.MILLISECONDS)
            .doOnNext(v -> System.out.println("Tack"))
            ;
    
    source.groupBy(v -> 1)
    .flatMap(g -> 
        other.flatMap(b -> g.map(a -> a + b))
    ).toBlocking().forEach(System.out::println);
    

    它的工作原理如下:
  • 捕获一个 GroupedObservable通过将源中的所有内容分组到组 1。
  • 当群g到达,我们“开始观察”另一个可观察的。
  • 一旦其他人触发了它的元素,我们就将其映射到组上并“开始观察”它,从而为我们带来 a + b 的最终序列。 s。

  • 我添加了 doOnNexts 以便您可以看到源在 other 之前确实处于事件状态发射它的“Tack”。

    关于reactive-programming - 延迟项目发射,直到项目从另一个 observable 发射,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30227373/

    相关文章:

    java - RX java 将可观察量与一对多关系相结合

    r - Shiny 的非 react 范围

    c# - 如何以 react 方式将字节分组到消息中

    java - RxJava retryWhen 和 onError

    scala - 创建 rx Observable 后添加元素

    javascript - 使用 RxJS 模拟命令队列和撤消堆栈

    javascript - RxJs 中的 Subject 和 Angular2 中的 EventEmitter

    android - 通过改造处理Rxjava中的无网络

    error-handling - 如何避免在onCompleted中引发的异常被吞下

    java - 将 CompletableFuture 与 RxJava 抽象结合使用