java - 方法 rxExecuteBlocking 消耗所有结果 - ssh 客户端

标签 java ssh rx-java vert.x

我正在尝试使用 Vert.x 制作非常简单的 SSH 客户端。由于我没有非阻塞 SSH 库,因此我必须处理 rxExecuteBlocking 中的所有内容。当我在一大段代码中运行所有逻辑时,它工作得很好,如下所示:

public Single<String> exec() {
   return vertx.rxExecuteBlocking(f -> {
       String result = "";

       // connect()
       // exec()
       // close()

       f.complete(result);
   }, false);
}

// hostnames :: Observalbe<String>
hostnames()
  .filter()
  .flatMapSingle(this::exec)
  .moreCalls()
  .subscribe(); // OK

我宁愿将 connect()exec()close() 分开并调用:

hostnames()
  .filter()
  .flatMapSingle(this::connect)
  .moreCalls()
  .flatMapSingle(this::exec)
  .moreCalls()
  .flatMapSingle(this::close)
  .subscribe();

但是当运行多于一段阻塞代码时

public Single<Connection> connect() {
  return vertx.rxExecuteBlocking(f -> {
     // connect
  }, false);
}

public Single<Connection> exec() {
  return vertx.rxExecuteBlocking(f -> {
     // exec
  }, false);
}

链在 flatMapSingle(this::connect) 处停止,首先消耗 filter() 的所有结果(建立所有连接),然后继续链。此行为消耗了相当多的资源,因为所有连接都在内存中(此行为让我想起 reduce()collect())

期望的结果不会在链中停止并继续,释放资源并对每个事件执行此操作。

有什么办法可以做到这一点吗?

提前致谢。

最佳答案

我建议尝试使用重载的flatMap,它将特定管道阶段并发订阅的可观察值的最大数量作为参数。假设默认情况下工作线程池中有 20 个线程,您可以为每个 flatMap 调用提供池的一小部分,例如每人 5 个。

hostnames()
  // ...some filtering
  .flatMap(hostname -> this.connect(hostname).toObservable(), 5)
  // ...more operators
  .flatMap(connection -> this.exec(connection).toObservable(), 5)
  // ...more operators
  .flatMap(connection -> this.close(connection).toObservable(), 5)
  .subscribe();

这将确保不会同时使用整个线程池。

可能需要对并发负载进行一些调整。例如,如果 connectexec 更快,则 connect 的并发订阅可观察量会减少,而 exec 的并发订阅量会增加。因此,connect 的结果在 exec 之前不会堆叠在缓冲区中。

关于java - 方法 rxExecuteBlocking 消耗所有结果 - ssh 客户端,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45566061/

相关文章:

amazon-web-services - 如何查看在 AWS ELB 后面运行的 SSH 服务器的 SSH 客户端的真实 IP

kotlin - defer() 和 defer{} 有什么区别

java - Android Studio 添加 rxjava 库

java - Scanner 类的 .next() 方法中的 String 值在内存中分配在哪里?

java - LinkedList 的简单 Java 排行榜

linux - 为什么使用带有特定命令的 SSH 时远程主机中没有 shell 进程?

ssh - Spark worker 不会绑定(bind)到 master

android - RxJava Android - 在适当的线程上加载缓存显示数据

java - 加入应用引擎的任何解决方案?

java - wifiManager.getScanResult() 返回空值