我正在尝试使用 Vert.x 制作非常简单的 SSH 客户端。由于我没有非阻塞 SSH 库,因此我必须处理 rxExecuteBlocking 中的所有内容。当我在一大段代码中运行所有逻辑时,它工作得很好,如下所示:
public Single<String> exec() {
return vertx.rxExecuteBlocking(f -> {
String result = "";
// connect()
// exec()
// close()
f.complete(result);
}, false);
}
// hostnames :: Observalbe<String>
hostnames()
.filter()
.flatMapSingle(this::exec)
.moreCalls()
.subscribe(); // OK
我宁愿将 connect()
、exec()
、close()
分开并调用:
hostnames()
.filter()
.flatMapSingle(this::connect)
.moreCalls()
.flatMapSingle(this::exec)
.moreCalls()
.flatMapSingle(this::close)
.subscribe();
但是当运行多于一段阻塞代码时
public Single<Connection> connect() {
return vertx.rxExecuteBlocking(f -> {
// connect
}, false);
}
public Single<Connection> exec() {
return vertx.rxExecuteBlocking(f -> {
// exec
}, false);
}
链在 flatMapSingle(this::connect)
处停止,首先消耗 filter()
的所有结果(建立所有连接),然后继续链。此行为消耗了相当多的资源,因为所有连接都在内存中(此行为让我想起 reduce()
或 collect()
)
期望的结果不会在链中停止并继续,释放资源并对每个事件执行此操作。
有什么办法可以做到这一点吗?
提前致谢。
最佳答案
我建议尝试使用重载的flatMap
,它将特定管道阶段并发订阅的可观察值的最大数量作为参数。假设默认情况下工作线程池中有 20 个线程,您可以为每个 flatMap
调用提供池的一小部分,例如每人 5 个。
hostnames()
// ...some filtering
.flatMap(hostname -> this.connect(hostname).toObservable(), 5)
// ...more operators
.flatMap(connection -> this.exec(connection).toObservable(), 5)
// ...more operators
.flatMap(connection -> this.close(connection).toObservable(), 5)
.subscribe();
这将确保不会同时使用整个线程池。
可能需要对并发负载进行一些调整。例如,如果 connect
比 exec
更快,则 connect
的并发订阅可观察量会减少,而 exec
的并发订阅量会增加。因此,connect
的结果在 exec
之前不会堆叠在缓冲区中。
关于java - 方法 rxExecuteBlocking 消耗所有结果 - ssh 客户端,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45566061/