rx-java - 在后台线程上可观察到的进程

标签 rx-java rx-android

我正在使用RxAndroid进行流操作。在我的实际用例中,我正在从服务器中获取列表(使用Retrofit)。我正在使用调度程序在后台线程上进行工作,并在Android UI(主)线程上获得最终发射。

这对于网络调用来说效果很好,但是我意识到在网络调用之后,我的运算符(operator)不使用后台线程,而是在主线程上调用它。

myService.fetchSomeIntegersFromServer()
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .filter(integer -> {
            System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
            return true;
        })
        .subscribe(integer1 -> {});

如何确保所有操作均在后台线程上执行?

最佳答案

TL; DR :将observeOn(AndroidSchedulers.mainThread())移到filter(...)以下。
subscribeOn(...)用于指定Observable将在哪个线程上开始操作。随后对subscribeOn的调用将被忽略。

因此,如果要编写以下内容,则所有操作都将在Schedulers.newThread()上执行:

myService.fetchSomeIntegersFromServer()
        .subscribeOn(Schedulers.newThread())
        .filter(integer -> {
            System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
            return true;
        })
        .subscribe(integer1 -> { doSomething(integer1); });

现在,当然,这不是您想要的:您想在主线程上使用doSomething
这就是observeOn的位置。在该调度程序上执行observeOn之后的所有操作。因此,在您的示例中,filter在主线程上执行。

而是将observeOn下移到subscribe之前:
myService.fetchSomeIntegersFromServer()
        .subscribeOn(Schedulers.newThread())
        .filter(integer -> {
            System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
            return true;
        })
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(integer1 -> { doSomething(integer1) });

现在,filter将出现在“新线程”上,而doSomething将出现在主线程上。

要走得更远,您可以多次使用observeOn:
myService.fetchSomeIntegersFromServer()
        .subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.computation())
        .filter(integer -> {
            System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
            return true;
        })
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(integer1 -> { doSomething(integer1) });

在这种情况下,提取将在新线程上进行,过滤将在计算线程上进行,而doSomething将在主线程上进行。

check out ReactiveX - SubscribeOn operator以获取正式文档。

关于rx-java - 在后台线程上可观察到的进程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33377832/

相关文章:

android - 从 Observable 获取元素的索引

java - 如何通过超时简化 rxjava 流

android - 如何使用 RxJava 逃离这个回调 hell

android - 验证 rxjava 订阅者中的交互

android - 如何在 WorkManager 中配置 Rx?

android - 在 Android 上调试时 RxJava 缓存线程中的 InterruptedException

android - SwitchIfEmpty 没有在里面执行 maybesource

java - 如何使用 RxJava 逐个读取字符串数组成员并调用网络 API 获取第一个结果?

java - 仅当其他方法完成时才调用方法 - RxJava

rx-java - 需要 RXJava 帮助来替换 AsyncTask