java - RxJava 和观察者代码的并行执行

标签 java system.reactive rx-java

我使用 RxJava Observable api 有以下代码:

Observable<Info> observable = fileProcessor.processFileObservable(processedFile.getAbsolutePath());
    observable
      .buffer(10000)
      .observeOn(Schedulers.computation())
      .subscribe(recordInfo -> {
        _logger.info("Running stage2 on thread with id : " + Thread.currentThread().getId());
          for(Info info : recordInfo) {
            // some I/O operation logic
         }
      }, 
      exception -> {
      }, 
      () -> {
      });

我的期望是观察代码,即 subscribe() 方法中的代码将在我指定计算调度程序后并行执行。相反,代码仍在单线程上按顺序执行。如何使用 RxJava api 让代码并行运行。

最佳答案

当谈到它的异步/多线程方面时,RxJava 经常被误解。多线程操作的编码很简单,但理解抽象是另一回事。

关于 RxJava 的一个常见问题是如何实现并行化,或者从 Observable 中同时发出多个项目。当然,这个定义违反了 Observable Contract,它规定 onNext() 必须按顺序调用,并且一次不能由多个线程同时调用。

要实现并行性,您需要多个 Observable。

这在一个线程中运行:

Observable<Integer> vals = Observable.range(1,10);

vals.subscribeOn(Schedulers.computation())
          .map(i -> intenseCalculation(i))
          .subscribe(val -> System.out.println("Subscriber received "
                  + val + " on "
                  + Thread.currentThread().getName()));

这在多个线程中运行:

Observable<Integer> vals = Observable.range(1,10);

vals.flatMap(val -> Observable.just(val)
            .subscribeOn(Schedulers.computation())
            .map(i -> intenseCalculation(i))
).subscribe(val -> System.out.println(val));

代码和文本comes from this blog post.

关于java - RxJava 和观察者代码的并行执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35425832/

相关文章:

java - 为什么在传递空持久对象而不初始化具有原始数据类型时,smallint 会自动插入 0

system.reactive - Rx.Net GroupJoin 两个 Observables,其中时间处于连接条件

c# - 响应式扩展即时搜索 WPF/MVVM

android - Observable.retrywhen 中的异常类型

java - 在这个简单的 RxJava 异步场景中我真的需要异常处理吗?

java - 如何在 RxJava 中获得多个响应?

java - 匹配器找到一个模式,但抛出 No match available on `start` 方法

java - 在二维数组中查找重复数字

java - 如何在 Firebase 中订购 'subdata'?

events - IObservable 与普通事件或为什么我应该使用 IObservable?