java - 在程序退出之前执行的非阻塞 CompleteableFuture

标签 java concurrency completable-future

我正在执行一个应用程序 (static void main(String[] args),该应用程序以特定节奏向端点异步发出 POST 请求。由于数量由于请求数量和应用程序运行时间较长,将 future 保存在列表中然后事后处理它们是不可行的。

因此,我添加了一个 thenAcceptAsync 调用,该调用仅记录传入的响应。但是,特别是对于较短的测试运行,程序将在某些响应返回之前完成并退出。

如何确保在程序完成之前记录所有响应?

public void replayScripts() {
    final String applicationId = getSparkApplicationId();
    LOGGER.debug("applicationId: {}", applicationId);

    final long scriptsToSubmit = this.config.getMaxScripts() == null ? this.config.getScripts().size()
            : this.config.getMaxScripts().intValue();
    long maxWaitTimeMillis = 0L;
    long actualWaitTimeMillis = 0L;

    for (int i = 0; i < scriptsToSubmit; i++) {
        LOGGER.info("submitting script #{} of {}", i + 1, scriptsToSubmit);

        this.dao.submitScript(this.config.getScripts().get(i).getRawScript())
                .thenAcceptAsync(this::logResponse);

        if (i + 1 < this.config.getScripts().size()) {
            final long millisWait = TimeUnit.MILLISECONDS.convert(
                    Duration.between(this.config.getScripts().get(i).getSubmissionTime(),
                            this.config.getScripts().get(i + 1).getSubmissionTime()).get(ChronoUnit.NANOS),
                    TimeUnit.NANOSECONDS);

            maxWaitTimeMillis += millisWait;
            actualWaitTimeMillis += sleep(millisWait, applicationId);

            if (this.config.getClearCache()) {
                this.dao.clearCache();
            }
        }
    }

    LOGGER.info("max wait time: {}s", maxWaitTimeMillis / 1000.0);
    LOGGER.info("actual wait time: {}s", actualWaitTimeMillis / 1000.0);
}
<小时/>

编辑:解决方案

根据 @irahavoi 的建议,我暂时采用了计数器解决方案,添加了此代码片段,该代码片段使用 AtomicInteger 类成员来跟踪记录的响应数量:

    while (this.config.isLogResponses() && this.loggedResponsesCounter.get() < scriptsToSubmit) {
        try {
            LOGGER.info("sleeping for one second to allow all responses to be logged");
            Thread.sleep(1000);
        } catch (final InterruptedException e) {
            LOGGER.warn("An exception occurred while attempting to wait for all responses to be logged", e);
        }
    }

最佳答案

我看到有两个选项可以解决这个问题(都不是 super 优雅):

  1. 将请求分成批处理(例如,每批处理为 1k 个请求)。并分别处理每批。这样您就可以使用它而不会出现 OOM 的风险:

    CompletableFuture.allOf(aBatchOfLogPipelineResponses).join();

  2. 正如上面评论中提到的,您可以为收到的响应设置一个计数器(每次收到响应时,它都会递增)。并定期在主线程中检查它:如果计数器的值等于scriptsToSubmit,则可以退出。

关于java - 在程序退出之前执行的非阻塞 CompleteableFuture,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52043183/

相关文章:

java - 在标签(Actor)上设置文本会停止绘制

concurrency - CCR:使用因果关系处理错误的最佳实践

java - 仅在 ExecutorService 中提交任务时设置变量值

java - "reuse"CompletableFuture 是否安全/良好实践

java - 朱尼特 : How to cover CompletableFuture Code

java.lang.NoSuchMethodError : org. openqa.selenium.support.ui.FluentWait.until(Lcom/google/common/base/Function;)Ljava/lang/Object;

java - 从eclipse外部保存的文件会自动编译吗?

java - 如何让我的循环不断重新输入新变量和运算符以继续计算?

Java 线程生产者消费者算法无法正常工作

Java CompletableFuture 分配执行器