java - Rx-java 2 : How to set priority between Observables?

标签 java rx-java rx-java2

我有一个基于时间的算法,在某些情况下,不同的可观测值可能在同一时刻发生两次发射。

请参阅下面的代码:

   int moment = ...; //calculated
   Observable.timer(moment, unit)
    .takeUntil(messages) 
    .map(e -> new Action())

messages Observable 在 Schedulers.computation() 上执行,就像计时器一样。

当消息与计时器同时到达时,行为是随机的。有时会发出新的 Action,有时则不会,takeUntil(messages) 优先。

我需要完全控制这种行为。当然,我可以在 moment 中添加几毫秒,这样它就总是在 messages 之后执行,但这不是我想要的。

如何告诉 RxJava 可观察的消息优先于计时器?

更新:

为了澄清问题,我添加了这个测试,每次执行它都会成功

    //before launching the test
    RxJavaPlugins.setComputationSchedulerHandler(scheduler ->
                        new TestScheduler());


    @Test
    public void rx_precedence() {

        int moment = 5;
        Observable<Long> messages = Observable.timer(moment, TimeUnit.SECONDS);
        TestObserver<Long> testObserver = Observable.timer(moment, TimeUnit.SECONDS)
                .takeUntil(messages)
                .test();

        scheduler.advanceTimeTo(moment, TimeUnit.SECONDS);
        testObserver.assertNoValues();
        testObserver.assertTerminated();
    }

在同一时刻,执行takeUntil并且不发出任何值。

但在服务器上,在现实世界中,当 je JVM 同时执行多项操作时,timer 可能优先于 takeUntil

我需要一种方法来使这种优先级在任何环境下都稳定,这样当messagestimer耗尽的同时发出一个值时,就不会发出任何值。

最佳答案

你需要Observable.amb()我想 - http://reactivex.io/documentation/operators/amb.html

更新: 好的,现在问题更清楚了。也许 Scheduler.single() 会有所帮助。

关于java - Rx-java 2 : How to set priority between Observables?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54631376/

相关文章:

java - 将输入字符串日期时间从不同时区转换为 UTC

java - 当值引用回键时如何使用弱引用进行缓存?

rx-java - RxJava javadoc 在哪里?

java - 是否可以在 libgdx Android 项目中使用 RxJava?

java - RxJava map() 在赋值时返回值吗?

android - 向 CompositeDisposable 添加大量 Disposable 的潜在危害

java - 用 RxJava 替换异步任务

java - Android AIDE lambda

Java计算最长可能的数组

rx-java - Rxjava Schedulers.newThread() 上限