我正在尝试压缩两个在不同线程上发出的 Observable:
Observable<String> xxxx1 = Observable.fromCallable((Func0<String>) () -> {
try {
Thread.sleep((long)(Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "First";
})
.doOnNext(s -> Log.d("TEEEST", "1 onNext " + s + " thread " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.computation());
Observable<String> xxxx2 = Observable.fromCallable((Func0<String>) () -> {
try {
Thread.sleep((long)(Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Second";
})
.doOnNext(s -> Log.d("TEEEST", "2 onNext " + s + " thread " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.io());
Observable.zip(xxxx1, xxxx2, (s1, s2) -> {
Log.d("TEEEST", "zip func thread " + Thread.currentThread().getName());
return s1.concat(s2);
})
.map(s -> {
Log.d("TEEEST", "map thread " + Thread.currentThread().getName());
return s.concat(" mapped");
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(s -> {
Log.d("TEEEST", "call " + s + " thread " + Thread.currentThread().getName());
});
似乎 zip 在最后一个发出值的线程上工作,这是我的日志的样子:
第一次运行:
D/TEEEST: 2 onNext Second thread RxIoScheduler-3
D/TEEEST: 1 onNext First thread RxComputationScheduler-1
D/TEEEST: zip func thread RxComputationScheduler-1
D/TEEEST: map thread RxComputationScheduler-1
D/TEEEST: call FirstSecond mapped thread main
第二次运行:
D/TEEEST: 1 onNext First thread RxComputationScheduler-2
D/TEEEST: 2 onNext Second thread RxIoScheduler-2
D/TEEEST: zip func thread RxIoScheduler-2
D/TEEEST: map thread RxIoScheduler-2
D/TEEEST: call FirstSecond mapped thread main
- 是否在某处记录了此行为?
- 为什么会这样。
- 如何确保 zip 函数和所有下游内容(在我的例子中是
map
运算符)在特定的调度器上工作,而不是在随机的调度器上工作。
最佳答案
zip
默认情况下不会在特定的 Scheduler
上运行。
zip
仅在具有要压缩的所有值时发出。所以它在接收最后一个值的同一个线程上发出。
为了确保所有下游操作都发生在特定的调度程序上,您必须定义一个 observeOn()
副作用。
对于所有下游操作,观察压缩结果就足够了。
Observable.zip(...).observeOn(scheduler)
对于上游,你必须观察观察者被压缩到这个特定的调度器上。
Observable.zip(o1.observeOn(scheduler), o2.observeOn(scheduler), ...)
根据您要使用的调度程序,这并不能保证线程。
关于android - 为什么 RxJava zip 运算符在最后发出值的线程上工作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44586663/