我正在使用RxAndroid进行流操作。在我的实际用例中,我正在从服务器中获取列表(使用Retrofit)。我正在使用调度程序在后台线程上进行工作,并在Android UI(主)线程上获得最终发射。
这对于网络调用来说效果很好,但是我意识到在网络调用之后,我的运算符(operator)不使用后台线程,而是在主线程上调用它。
myService.fetchSomeIntegersFromServer()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.filter(integer -> {
System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
return true;
})
.subscribe(integer1 -> {});
如何确保所有操作均在后台线程上执行?
最佳答案
TL; DR :将observeOn(AndroidSchedulers.mainThread())
移到filter(...)
以下。subscribeOn(...)
用于指定Observable
将在哪个线程上开始操作。随后对subscribeOn
的调用将被忽略。
因此,如果要编写以下内容,则所有操作都将在Schedulers.newThread()
上执行:
myService.fetchSomeIntegersFromServer()
.subscribeOn(Schedulers.newThread())
.filter(integer -> {
System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
return true;
})
.subscribe(integer1 -> { doSomething(integer1); });
现在,当然,这不是您想要的:您想在主线程上使用
doSomething
。这就是
observeOn
的位置。在该调度程序上执行observeOn
之后的所有操作。因此,在您的示例中,filter
在主线程上执行。而是将
observeOn
下移到subscribe
之前:myService.fetchSomeIntegersFromServer()
.subscribeOn(Schedulers.newThread())
.filter(integer -> {
System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
return true;
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(integer1 -> { doSomething(integer1) });
现在,
filter
将出现在“新线程”上,而doSomething
将出现在主线程上。要走得更远,您可以多次使用
observeOn
:myService.fetchSomeIntegersFromServer()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.computation())
.filter(integer -> {
System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
return true;
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(integer1 -> { doSomething(integer1) });
在这种情况下,提取将在新线程上进行,过滤将在计算线程上进行,而
doSomething
将在主线程上进行。check out ReactiveX - SubscribeOn operator以获取正式文档。
关于rx-java - 在后台线程上可观察到的进程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33377832/