android - 为什么 RxJava zip 运算符在最后发出值的线程上工作?

标签 android multithreading rx-java

我正在尝试压缩两个在不同线程上发出的 Observable:

Observable<String> xxxx1 = Observable.fromCallable((Func0<String>) () -> {
    try {
        Thread.sleep((long)(Math.random() * 1000));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "First";
})
        .doOnNext(s -> Log.d("TEEEST", "1 onNext " + s + " thread " + Thread.currentThread().getName()))
        .subscribeOn(Schedulers.computation());

Observable<String> xxxx2 = Observable.fromCallable((Func0<String>) () -> {
    try {
        Thread.sleep((long)(Math.random() * 1000));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Second";
})
        .doOnNext(s -> Log.d("TEEEST", "2 onNext " + s + " thread " + Thread.currentThread().getName()))
        .subscribeOn(Schedulers.io());

Observable.zip(xxxx1, xxxx2, (s1, s2) -> {
    Log.d("TEEEST", "zip func thread " + Thread.currentThread().getName());
    return s1.concat(s2);
})
        .map(s -> {
            Log.d("TEEEST", "map thread " + Thread.currentThread().getName());
            return s.concat(" mapped");
        })
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(s -> {
            Log.d("TEEEST", "call " + s + " thread " + Thread.currentThread().getName());
        });

似乎 zip 在最后一个发出值的线程上工作,这是我的日志的样子:

第一次运行:

D/TEEEST: 2 onNext Second thread RxIoScheduler-3
D/TEEEST: 1 onNext First thread RxComputationScheduler-1
D/TEEEST: zip func thread RxComputationScheduler-1
D/TEEEST: map thread RxComputationScheduler-1
D/TEEEST: call FirstSecond mapped thread main

第二次运行:

D/TEEEST: 1 onNext First thread RxComputationScheduler-2
D/TEEEST: 2 onNext Second thread RxIoScheduler-2
D/TEEEST: zip func thread RxIoScheduler-2
D/TEEEST: map thread RxIoScheduler-2
D/TEEEST: call FirstSecond mapped thread main
  1. 是否在某处记录了此行为?
  2. 为什么会这样。
  3. 如何确保 zip 函数和所有下游内容(在我的例子中是 map 运算符)在特定的调度器上工作,而不是在随机的调度器上工作。

最佳答案

zip 默认情况下不会在特定的 Scheduler 上运行。

zip 仅在具有要压缩的所有值时发出。所以它在接收最后一个值的同一个线程上发出。

为了确保所有下游操作都发生在特定的调度程序上,您必须定义一个 observeOn() 副作用。

对于所有下游操作,观察压缩结果就足够了。

Observable.zip(...).observeOn(scheduler)

对于上游,你必须观察观察者被压缩到这个特定的调度器上。

Observable.zip(o1.observeOn(scheduler), o2.observeOn(scheduler), ...)

根据您要使用的调度程序,这并不能保证线程。

关于android - 为什么 RxJava zip 运算符在最后发出值的线程上工作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44586663/

相关文章:

java - show() 时弹出菜单崩溃;

java - Android 如何识别 GS1 DataBar 条形码?

android - 在 .xml 文件中除以零错误

php - php 是以多线程还是多进程的方式处理并发请求的?

android - 如何避免重复常见的可观察配置?

java语言语法

c - 创建线程时,有些线程的线程 ID 为 0

php - “Fatal error class ' 尝试使用 PHP pthreads 时线程 ' not found in…”

android - RxJava - 如何在 Android 中的 4 个点击事件中调用订阅者

android - 根据结果​​链接可观察对象