java - RX Java Single 未从 Single.merge 返回

标签 java rx-java2 vert.x

我有几个 api 调用(Rx singles),我想将它们组合成一个 Single。我正在使用 Single.merge 尝试合并这些调用的结果,但是当我订阅响应时,我得到一个空数组,因为订阅已经发生。我调用 HealthChecker 期望订阅将返回结果列表:

     new HealthChecker(vertx)
        .getHealthChecks(endpoints)
        .subscribe(messages -> {
            log.info("Completed health check {}", messages);
            routingContext.response()
                          .putHeader("content-type", "text/json")
                          .end(messages.toString());
        });

健康检查器类执行逻辑:

public class HealthChecker {

    private static Logger log = LoggerFactory.getLogger(HealthChecker.class);

    private Vertx vertx;
    private WebClient client;

    public HealthChecker(Vertx vertx) {
        this.vertx = vertx;
        client = WebClient.create(vertx);
    }

    public Single<List<String>> getHealthChecks(JsonArray endpoints) {
        return Single.fromCallable(() -> {

            List<Single<String>> healthChecks = endpoints
                .stream()
                .map(endpoint -> getHealthStatus(client, endpoint.toString()))
                .collect(Collectors.toList());

            return consumeHealthChecks(healthChecks).blockingGet();

        });
    }

    private Single<List<String>> consumeHealthChecks(List<Single<String>> healthChecks) {
        return Single.fromCallable(() -> {
            List<String> messages = new ArrayList<>();

            Single.merge(healthChecks)
                  .timeout(1500, TimeUnit.MILLISECONDS)
                  .subscribe(message -> {
                      log.info("Got health check {}", message);
                      messages.add(message);
                  }, error -> {
                      log.info("Timeout - could not get health check");

                  });

            return messages;
        });
    }

    private Single<String> getHealthStatus(WebClient client, String endpoint) {
        log.info("getting endpoint {}", endpoint);

        return client
            .getAbs(endpoint)
            .rxSend()
            .map(HttpResponse::bodyAsString)
            .map(response -> response);

    }
}

我希望返回值是一个列表,除了我得到的只是一个空列表,然后是结果。这是日志:

09:12:06.235 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - getting endpoint http://localhost:5000/status
09:12:06.241 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - getting endpoint http://localhost:5001/status
09:12:06.241 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - getting endpoint http://localhost:5002/status
09:12:06.241 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - getting endpoint http://localhost:5003/status
09:12:06.241 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - getting endpoint http://localhost:5004/status
09:12:06.300 [vert.x-eventloop-thread-1] INFO  sys.health.HealthCheckVerticle - Completed health check []
09:12:06.688 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - Got health check {"isHealthy":true}
09:12:06.844 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - Got health check {"isHealthy":true}
09:12:06.898 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - Got health check {"isHealthy":false}
09:12:07.072 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - Got health check {"isHealthy":true}
09:12:07.255 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - Got health check {"isHealthy":true}

最佳答案

为什么使用fromCallableblockingGet?此外,您还可以触发合并,而无需实际等待它运行完成,因此列表为空。相反,在内部 Single 上进行组合:

public Single<List<String>> getHealthChecks(JsonArray endpoints) {
    return Single.defer(() -> {

        List<Single<String>> healthChecks = endpoints
            .stream()
            .map(endpoint -> getHealthStatus(client, endpoint.toString()))
            .collect(Collectors.toList());

        return consumeHealthChecks(healthChecks);
    });
}

private Single<List<String>> consumeHealthChecks(List<Single<String>> healthChecks) {
    return Single.merge(healthChecks)
                 .timeout(1500, TimeUnit.MILLISECONDS)
                 .toList();
}

关于java - RX Java Single 未从 Single.merge 返回,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59561036/

相关文章:

java - rxjava2的Maybe和Optional有什么区别?

java - "No Instance of type variable R exist so that Observable conforms to Observable"更新到 RxJava2 时出错

在 Flowable 中使用方法引用时,Kotlin 无法推断类型

java - 使用 Jackson 将巨大的 JSON 响应反序列化为 POJO 的最有效方法是什么?

java - vert.x 的logging.properties 中的含义是什么?

vert.x - 与 vertx 组合以获取顺序代码

java - JAX-WS WebService、Addressing、MTOM 和 RespectBinding 功能用例

java - 为类(class)成员提供对另一个类(class)成员的引用

Java web start 启动失败 - JRE 7 32位(运行在64位windows系统上)

java - 我是否需要设置 Maven 才能使用 Maven ant 依赖任务?