multithreading - RxJava 并行化是否打破了 Observable 契约?

标签 multithreading parallel-processing reactive-programming rx-java

本·克里斯滕森 posted here目前在 RxJava 中实现并行性的最佳方法是创建另一个 Observable 并在调度程序上订阅它,如下所示。

streamOfItems.flatMap(item -> {
   doStuffWithItem(item).subscribeOn(Schedulers.io());
});

然而,Observable Contract表示 onNext() 调用可以调用任意次数,只要调用不重叠即可。好吧,链的其余部分中遵循上述操作的任何操作符现在都可以轻松打破该规则(除非他们明确地进行某种同步/序列化)。

我的印象是 RxJava 更喜欢一次在一个线程上保持发射流,并在特定运算符处将稳定的顺序流从一个线程切换到另一个线程,但从不并行(如下所示)。

observeOn() thread     -------------------------Y----Y----Y-------------
subscribeOn() thread   ----X----X----X----X-----------------------------

通过并行方法,我知道图表可能看起来像这样,而且我觉得它看起来很重叠。

par subscribeOn() thread 3    -------------------------Y-----Y---------------
par subscribeOn() thread 2    ---------------------------Y---Y---------------
par subscribeOn() thread 1    -------------------------Y-------------Y-------
initial subscribeOn() thread  ----X----X----X----X---------------------------

我是否误解了什么或做出了广泛的假设?并行性不会破坏 Observable 契约吗?这是否使其在某种程度上不受欢迎?

最佳答案

如果您使用标准运算符,则不会破坏 Observable 契约,因为无论何时可能发生并发,运算符都会序列化它们的输出。在您的示例中,flatMap 这样做是为了保证其输出是连续的(尽管接收线程可能来回切换)。

但是,如果同一管道的不同阶段由异步边界或可能进行线程仲裁的运算符分隔,则通常情况并非如此。

关于multithreading - RxJava 并行化是否打破了 Observable 契约?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33205661/

相关文章:

R:获取给定函数内所有变量和函数的列表和环境(用于并行处理)

parallel-processing - OpenMP,使用所有内核并行

c - 如何继续制作一个可以跟踪另一个软件的功能调用的软件?

c# - 数字处理线程的高效取消事件实现?

python - 在 Flask 中使用 joblib 进行并行计算

apache-spark - Apache Kafka 和 Spark 流

java - 连接 "n"个观察者后 refCount : call underlying ConnectObservable. connect()

java - 如何在方法之间同步线程?

multithreading - 什么是竞争条件?

java - RxJava 中 REAL 背压的最佳实现