java - 使用多线程 RxJava 的响应式(Reactive)拉取

标签 java multithreading java-8 reactive-programming rx-java

我正在尝试在 RxJava 中构建一个响应式(Reactive)拉动观察器。

我的观察者是这样的:

Observable<Command> myObs = Observable.create(s -> {
   Command command;
   int i = 0;
   do {
      command = NetworkOperation1.call(i);
      logger.info("Init command " + i);
      s.onNext(command);
      i++;
   } while (!command.isLast() && i < MAX);
   s.onCompleted();
});

我想在 4 个并发批处理(缓冲区)中处理它,如下所示:

myObs
    .buffer(10)
    .flatMap(batch -> {
          return Observable
                   .from(batch)
                   .subscribeOn(Schedulers.io())
                   .map(c -> {
                       Intermediate m = NetworkOperation2.call(c));
                       logger.info("Done intermediate " + m.id);
                       return m;
                   }
          }, 4);

然后,我需要以不同的大小对结果进行批处理,如下所示:

    .buffer(25)
    .subscribeOn(Schedulers.newThread())
    .subscribe(list ->
         logger.info("Finished batch with " + list.size());

问题是 Observable 中的命令是一次性处理的,而我希望它们在需要时被处理

这是发生的事情的日志:(注意所有 1000 个命令都是一次运行,而不是根据需要调用)

Init command 0
Init command 1
Init command 2
...
Init command 999
Done intermediate 0
Done intermediate 1
...
Done intermediate 24
Finished batch with 25
Done intermediate 25
Done intermediate 26
...
Done intermediate 49
Finished batch with 25
...

问题:有没有办法暂停 Observer 的线程,这样它就不会立即发出所有命令或类似的东西?我已尝试使用 request() 运算符,但无法正常工作。

谢谢。

最佳答案

您需要具有背压意识的来源和运算符(operator)。您使用的运算符支持背压,但您的源不支持。

改为这样做:

myObs = Observable.range(1,1000)
    .map(i -> NetworkOperation1.call(i));

Observable.range 支持背压,因此只会在请求时发出。

关于java - 使用多线程 RxJava 的响应式(Reactive)拉取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31970568/

相关文章:

java - 多线程合并,最佳java线程实践推荐

java - 修改参数的值

c# - 等待启动下一个方法或线程,直到后台线程在使用 C# .Net 的 Windows 窗体应用程序中结束

java - 将映射转换为 java 8 流中更扁平的结构

java - 关闭InputStream即使抛出也会释放资源吗?

java - 组件在 JFrame 中不可见

java - 如何在 Java 8 中使用过滤器忽略 int 数组中的值并收集

java - 尝试使用资源,如果未捕获抛出的异常,资源是否会清理?

java - 使用 Play 进行用户管理!框架 2.0.3

java - 解决 "integer division in floating-point context"警告