java - 在某个线程中的 jaBehaviorSubject 上运行 doOnNex

标签 java multithreading thread-safety rx-java

所以首先我将向您展示我所拥有的以及我认为那里会发生什么:

我有一个BehavourSubject<DataObject> :

private BehaviorSubject<DataObject> dataSubject = BehaviorSubject.create();

我将其返回到某个函数中,如下所示:

public Observable<DataObject> pendingData() {
    return this.dataSubject.asObservable()
        .doOnNext(data -> {
            // do something with this data that has to be thread save.
        })
        .observeOn(AndroidSchedulers.mainThread());
}

我假设发生的情况是,doOnNext 部分将在同一个线程中运行,即 this.dataSubject.onNext(data);叫做。但是当我做一些必须在这个 lambda 中进行线程保存的事情时,我应该将其放入信号量中或运行所有 doOnNext某个线程中的操作。

我的第一个想法是“处理 rx 中线程的正常方法”,但我不知道它是否有效。 我想添加一个 subscribeOn(certainBackgroundScheduler)像这样的可观察值:

public Observable<DataObject> pendingData() {
    return this.dataSubject.asObservable()
        .doOnNext(data -> {
            // do something with this data that has to be thread save.
        })
        .subscribeOn(certainBackgroundScheduler)
        .observeOn(AndroidSchedulers.mainThread());
}

但是当我使用订阅 block 创建一个 Observable 时,这个 block 就在 backgroundScheduler 中运行。 。当我调用onNext时在订阅者上,我在该线程中调用它,这是合乎逻辑的,但它在BehaviorSubject中是否相同?

真的有这么简单吗?如果没有,我如何强制主体运行 doOnNext阻止我的某个线程?

最佳答案

您可以在链中拥有多个 observeOn,这样您就可以在不同的执行“位置”之间路由值。

dataSubject
.observeOn(backgroundScheduler)
.doOnNext(v -> /* this will run on another scheduler. */)
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(v -> /* this will run on main after the previous */)

关于java - 在某个线程中的 jaBehaviorSubject 上运行 doOnNex,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41057265/

相关文章:

java - 如何不重复else语句

java - JPanel 布局 - 添加文本框并重新排列组件

c# - 一次性方法的线程安全实现

multithreading - 每个 Hadoop map task 使用多少个核心?

java - 与Java的LinkedList类的并发

java - 带有tomcat的spring boot web应用程序在启动时挂起

Java文件上传大小检查

java - Android 工作线程卡住主线程

c - OpenMP 和函数

java - 线程安全的多重集