java - 为 Rx v2 Flowable 编写同步单元测试

标签 java android rx-java reactive-programming rx-android

我正在将我的项目从 Rx v1 转换为 Rx v2,目前我正在将一些 v1 Observable 更改为 v2 Flowable .

(它在 Android 项目中,使用 Spock 在 Groovy 中编写单元测试)

通常我会使用钩子(Hook)覆盖调度程序。我仍然可以通过注册调度程序处理程序在 v2 中执行此操作。这使得 Observable 始终使用(新的?)Schedulers.single() 同步。但是,由于背压机制(?),Flowable 仍然是异步的。

我尝试使用以下方法解决该问题:

Flowable<LogEntry> flowable = Flowable.create(new FlowableOnSubscribe<LogEntry>() {
    @Override
    void subscribe(FlowableEmitter<LogEntry> emitter) throws Exception {
        for (def log : logs) {
            emitter.onNext(log)
        }

        emitter.onComplete()
    }
}, FlowableEmitter.BackpressureMode.NONE);

但这仍然使它们异步。

我已经像这样覆盖了调度器:

RxJavaPlugins.reset()
RxJavaPlugins.setIoSchedulerHandler(new Function<Scheduler, Scheduler>() {
    @Override
    Scheduler apply(Scheduler scheduler) throws Exception {
        return Schedulers.single()
    }
})

RxAndroidPlugins.reset()
RxAndroidPlugins.setMainThreadSchedulerHandler(new Function<Scheduler, Scheduler>() {
    @Override
    Scheduler apply(Scheduler scheduler) throws Exception {
        return Schedulers.from(new Executor() {
            @Override
            void execute(Runnable command) {
                command.run()
            }
        })
    }
})

我似乎无法弄清楚为什么 Observable 会像这样同步,但 Flowable 却不会(除了背压机制)

最佳答案

Schedulers.single() 是一个单线程异步调度程序。您需要 Schedulers.trampoline() 才能保持在同一个线程上。

关于java - 为 Rx v2 Flowable 编写同步单元测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39625324/

相关文章:

java - DisposableObserver 与(常规)观察者

java - 构造函数是必要的还是仅仅为了方便而使用?

java - 如何在Java中使用Gradle Test类的afterSuite方法?

java - Sonarqube 和 Jacoco Gradle 插件

java - 以编程方式清除缓存的后台进程

android - Rx Java Android 中的混淆

java - Lucene 4.0 获取字段信息

java - LineStyleBuilder 等无法解析 setColor() 等函数

android - 如何在 webView 的 url 中发送 referer 请求

Android RXJava 清理解决方案