java - TestScheduler 不适用于 RxJava

标签 java rx-java

我正在尝试测试一个函数,其中流的元素在延迟后一个接一个地分派(dispatch),我能够使用 Thread.sleep 使我的测试工作。但是,当我使用 TestScheduler.advanceTimeBy 时,我得不到任何结果。

查看代码:

public Observable<Object> getDelayedObjects(Observable<Observable<Object>> objectsStreams) {
    objectsStreams.concatMap(objectsStream ->
        objectsStream.repeat().concatMap(object ->
            Observable.just(object)
                      .delay(getDuration(object), TimeUnit.MILLISECONDS)));
}

和测试代码:

TestScheduler testScheduler = new TestScheduler();
BehaviorSubject<Observable<Object>> objectStreamSubject = BehaviorSubject.create(objectsStream);

model.getDelayedObjects(objectStreamSubject)
        .observeOn(testScheduler)
        .subscribeOn(testScheduler)
        .subscribe(testSubscriber);

testScheduler.triggerActions();
//Thread.sleep(900) works with the default scheduler
testScheduler.advanceTimeBy(900, TimeUnit.MILLISECONDS);
testSubscriber.assertReceivedOnNext(objects);

更新:

检查 TestScheduler 用法我发现通常将调度程序传递给 delay 函数。因此,通过将调度程序作为参数提供给方法 getDelayedObjects 然后再提供给 delay,我能够让测试通过。但是,我仍然不明白为什么它以前不起作用。

最佳答案

默认情况下,delay 运算符使用 computation scheduler用于执行基于时间的延迟。此信息可在 documentation 中找到的方法。在 @SchedulerSupport 注释中查找值,在本例中为 io.reactivex:computation

出于测试目的,您必须将计算调度程序替换为 TestScheduler .为了能够替换,您必须使用 one of the many overrides包含 Schedulerdelay 运算符。

关于java - TestScheduler 不适用于 RxJava,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40457078/

相关文章:

android - Realm 查询不更新我的适配器

java - 无法在我的可执行 jar 中包含外部 jar

java - 使用优先级队列的排序列表的迭代器

java - 将日期加载到elasticsearch时,我的日期字段存在问题。日期不正确

android - 将 rxJava 与 MVP 结合使用的 GoogleMaps

android - RxJava : How to extract same observeOn from different functions?

java - rxjava : emitting one result of all observables instead of multiple

java - HTTP组件编码问题

java - 如何在 Vert.x 中进行有效的复合协调编程

kotlin - 如何从rxjava中的列表列表执行过滤器运算符