rx-java - Rx 跳过,直到经过几秒

标签 rx-java rx-java2 rx-android

我有一个 PublishSubject 每 X 秒发出一个信号,我想只考虑 Y 秒后发出的第一个项目。

例子

  • observable A 每秒发出一次“滴答声”
  • observable B 应该每 5 秒发出一次这个“滴答声”,忽略 介于两者之间。

也就是说,跳过每个项目,直到某个时间跨度过去。

不是去抖动,因为去抖动需要原始 Observable 保持 5 秒发射,但我的每秒发射一次。

最佳答案

请看看我的解决方案。 'tickEverySecond' 将每秒发出一个值。 'result' 将在 5 秒的时间窗口内收集 'tickEverySecond' 发出的所有项目。每隔 5 秒窗口,'tickEverySecond' 的最后一个发射值将被推送到订阅者。

@Test
  public void name() {
    TestScheduler testScheduler = new TestScheduler();
    Flowable<Long> tickEverySecond = Flowable.interval(0, 1, TimeUnit.SECONDS, testScheduler);

    Flowable<Long> result = tickEverySecond.window(5, TimeUnit.SECONDS, testScheduler).flatMap(longFlowable -> longFlowable.takeLast(1));

    TestSubscriber<Long> test = result.test();

    testScheduler.advanceTimeTo(4, TimeUnit.SECONDS);
    test.assertValueCount(0).assertNotComplete();

    testScheduler.advanceTimeTo(5, TimeUnit.SECONDS);
    test.assertValue(4L).assertNotComplete();

    testScheduler.advanceTimeTo(9, TimeUnit.SECONDS);
    test.assertValues(4L).assertNotComplete();

    testScheduler.advanceTimeTo(10, TimeUnit.SECONDS);
    test.assertValues(4L, 9L).assertNotComplete();

    testScheduler.advanceTimeTo(14_999, TimeUnit.MILLISECONDS);
    test.assertValues(4L, 9L).assertNotComplete();
  }

您可以使用以下运算符获得相同的效果:

Flowable<Long> result = tickEverySecond.throttleLast(5, TimeUnit.SECONDS, testScheduler);

关于rx-java - Rx 跳过,直到经过几秒,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47818947/

相关文章:

system.reactive - 使用 RxJava 并行调用网络服务。这是正确的方法吗?

java - RxJava 平面图 : How to skip errors?

java - RXJava2中几种方法的组合

java - 在 react 流中哪里放置参数验证?

android - 如果发生异常,则将 flatMap 短路

android - 如何在 rxjava 和改造中每小时重试请求 5 次

java - 主线程中的SubscribeOn和observeOn

android - 使用 RxJava 转换 HashMap 值

android - Rx-Android 库导入 rx.android.plugins vs io.reactivex.android.plugins

android - RxAndroid 3 主线程