所以首先我将向您展示我所拥有的以及我认为那里会发生什么:
我有一个BehavourSubject<DataObject>
:
private BehaviorSubject<DataObject> dataSubject = BehaviorSubject.create();
我将其返回到某个函数中,如下所示:
public Observable<DataObject> pendingData() {
return this.dataSubject.asObservable()
.doOnNext(data -> {
// do something with this data that has to be thread save.
})
.observeOn(AndroidSchedulers.mainThread());
}
我假设发生的情况是,doOnNext 部分将在同一个线程中运行,即 this.dataSubject.onNext(data);
叫做。但是当我做一些必须在这个 lambda 中进行线程保存的事情时,我应该将其放入信号量中或运行所有 doOnNext
某个线程中的操作。
我的第一个想法是“处理 rx 中线程的正常方法”,但我不知道它是否有效。
我想添加一个 subscribeOn(certainBackgroundScheduler)
像这样的可观察值:
public Observable<DataObject> pendingData() {
return this.dataSubject.asObservable()
.doOnNext(data -> {
// do something with this data that has to be thread save.
})
.subscribeOn(certainBackgroundScheduler)
.observeOn(AndroidSchedulers.mainThread());
}
但是当我使用订阅 block 创建一个 Observable 时,这个 block 就在 backgroundScheduler
中运行。 。当我调用onNext
时在订阅者上,我在该线程中调用它,这是合乎逻辑的,但它在BehaviorSubject中是否相同?
真的有这么简单吗?如果没有,我如何强制主体运行 doOnNext
阻止我的某个线程?
最佳答案
您可以在链中拥有多个 observeOn
,这样您就可以在不同的执行“位置”之间路由值。
dataSubject
.observeOn(backgroundScheduler)
.doOnNext(v -> /* this will run on another scheduler. */)
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(v -> /* this will run on main after the previous */)
关于java - 在某个线程中的 jaBehaviorSubject 上运行 doOnNex,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41057265/