java - 热 Observables 的 RxJava 延迟

标签 java rx-java delay

我看到这个问题here 。 这是关于实现每个发出的项目的延迟。这是根据accepted answer如何实现的:

Observable.zip(Observable.range(1, 5)
    .groupBy(n -> n % 5)
    .flatMap(g -> g.toList()),
Observable.interval(50, TimeUnit.MILLISECONDS),
(obs, timer) -> obs)
.doOnNext(item -> {
  System.out.println(System.currentTimeMillis() - timeNow);
  System.out.println(item);
  System.out.println(" ");
}).toList().toBlocking().first();

在问题中,提问者特别要求一组固定的可观察量(Observable.range(1,5)),不幸的是这不是我想要实现的。

我也看到了这个comment .

这个评论就是我想要实现的。因此,我的源可观察对象以比间隔更慢(有时更快)的速率发出项目。而且 observable 的发出是永无止境的。

===

所以基本上我希望热可观察量具有最小的延迟。

例如,如果我想要 400 毫秒的最小延迟,并且我有这种可观察到的发射度:

X1-100ms delay-X2-200ms delay-X3-600ms delay-X4-20000ms delay-X5-...

我希望它产生:

X1-400ms delay-X2-400ms delay-X3-600ms delay-X4-20000ms delay-X5-...

有人有任何想法来实现这一目标吗?

最佳答案

你的要求好奇怪……

我可以解决它,但不优雅。这是我的代码:

class Three<A, B, C> {
    A a;
    B b;
    C c;
    // Getter, Setter, Constructor
  }

  public static void main(String[] args) throws Exception {
    BehaviorSubject<Integer> s = BehaviorSubject.create();
    // Three = (The value, upstream comes mills, downstream emits mills)
    s.map(i -> new Three<>(i, System.currentTimeMillis(), System.currentTimeMillis()))
        .scan((a, b) -> {
          b.setC(a.getC() + Math.max(400L, b.getB() - a.getB()));
          return b;
        })
        .concatMap(i -> Observable.just(i.getA()).delay(Math.max(0, i.getC() - System.currentTimeMillis()),
            TimeUnit.MILLISECONDS))
        .subscribe(i -> System.out.println(i + "\t" + System.currentTimeMillis()));
    s.onNext(0);
    Thread.sleep(100);
    s.onNext(1);
    Thread.sleep(200);
    s.onNext(2);
    Thread.sleep(600);
    s.onNext(3);
    Thread.sleep(2000);
    s.onNext(4);
    Thread.sleep(200);
    s.onNext(5);
    Thread.sleep(800);
    s.onNext(6);
    Thread.sleep(1000);
  }

和输出

0   1510128693984
1   1510128694366 // 400ms
2   1510128694766 // 400ms
3   1510128695366 // 600ms
4   1510128697366 // 2000ms
5   1510128697766 // 400ms
6   1510128698567 // 800ms

关于java - 热 Observables 的 RxJava 延迟,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47157200/

相关文章:

java - 使用套接字在 java 同步方法中抛出 NullPointerException

rx-java - RxJava 链接 observables 和错误处理(自定义异常传播)

javascript - jquery 延迟更改排队函数关闭?

c - pthread超时或取消,使用pthread_cond_timedwait,还是什么?

java - EJB bean 中的 @Schedule 注释和超时

java - Eclipse - 错误 : Main method not found in class projectOne, 请将 main 方法定义为:public static void main(String[] args)

android - 将 RxAndroid 导入 Android Studio

javascript - touchend 上的延迟操作

java - Java 数组的克隆方法

rx-java - rxjava 行为主题删除/清除值