我正在将我的项目从 Rx v1 转换为 Rx v2,目前我正在将一些 v1 Observable
更改为 v2 Flowable
.
(它在 Android 项目中,使用 Spock 在 Groovy 中编写单元测试)
通常我会使用钩子(Hook)覆盖调度程序。我仍然可以通过注册调度程序处理程序在 v2 中执行此操作。这使得 Observable
始终使用(新的?)Schedulers.single()
同步。但是,由于背压机制(?),Flowable
仍然是异步的。
我尝试使用以下方法解决该问题:
Flowable<LogEntry> flowable = Flowable.create(new FlowableOnSubscribe<LogEntry>() {
@Override
void subscribe(FlowableEmitter<LogEntry> emitter) throws Exception {
for (def log : logs) {
emitter.onNext(log)
}
emitter.onComplete()
}
}, FlowableEmitter.BackpressureMode.NONE);
但这仍然使它们异步。
我已经像这样覆盖了调度器:
RxJavaPlugins.reset()
RxJavaPlugins.setIoSchedulerHandler(new Function<Scheduler, Scheduler>() {
@Override
Scheduler apply(Scheduler scheduler) throws Exception {
return Schedulers.single()
}
})
RxAndroidPlugins.reset()
RxAndroidPlugins.setMainThreadSchedulerHandler(new Function<Scheduler, Scheduler>() {
@Override
Scheduler apply(Scheduler scheduler) throws Exception {
return Schedulers.from(new Executor() {
@Override
void execute(Runnable command) {
command.run()
}
})
}
})
我似乎无法弄清楚为什么 Observable
会像这样同步,但 Flowable
却不会(除了背压机制)
最佳答案
Schedulers.single()
是一个单线程异步调度程序。您需要 Schedulers.trampoline()
才能保持在同一个线程上。
关于java - 为 Rx v2 Flowable 编写同步单元测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39625324/