java - 同步多个异步请求

标签 java concurrency future

给定一个异步服务,该服务应在完成后关闭。我想执行请求的多个实例。当所有请求完成后,我想关闭该服务。我想知道实现这一目标的最佳方法是什么。因此,这是演示问题的代码但没有实际关闭服务:

    class Service implements Closeable {
        public Service() {/*...*/}

        public ListenableFuture<Integer> processRequest(Integer param) {/*...*/}

        @Override
        public void close() {/*...*/}
    }

    public void proccessRequests(ArrayList<Integer> params) {
        Service svc = new Service();
        for (Integer param : params) {
            final ListenableFuture<Integer> res = svc.processRequest(param);
        }
    }

我正在考虑关闭该服务的不同选项:

  • 使用CountDownLatch这样:

    public void processRequests(ArrayList<Integer> params) {
        Service svc = new Service();
        CountDownLatch latch = new CountDownLatch(params.size());
        for (Integer param : params) {
            final ListenableFuture<Integer> res = svc.processRequest(param);
            Futures.addCallback(res, new FutureCallback<Integer>() {
                @Override
                public void onSuccess(Integer integer) {
                    latch.countDown();
                    if (latch.getCount() == 0) {
                       svc.close();
                    }
                }
    
                @Override
                public void onFailure(Throwable throwable) {
                    latch.countDown();
                    if (latch.getCount() == 0) {
                       svc.close();
                    }
                }
            });
        }
    }
    
  • 使用CountDownLatch这样:

    public void processRequests(ArrayList<Integer> params) {
        Service svc = new Service();
        CountDownLatch latch = new CountDownLatch(params.size());
        for (Integer param : params) {
            final ListenableFuture<Integer> res = svc.processRequest(param);
            Futures.addCallback(res, new FutureCallback<Integer>() {
                @Override
                public void onSuccess(Integer integer) {
                    latch.countDown();
                }
    
                @Override
                public void onFailure(Throwable throwable) {
                    latch.countDown();
                }
            });
        }
        latch.await();
        svc.close();
    }
    
  • 与第一个选项类似,但使用 AtomicInteger .

实现这一目标的最佳方法是什么?第一,第二,第三,这些都不是?

最佳答案

您使用 CountDownLatch 的第一个解决方案看起来不错,但还有一些其他方法。

从版本 20.0 开始,Futures 类具有专门为此目的而设计的 whenAllComplete 方法。使用它你可以写:

Service svc = new Service();
Futures.whenAllComplete(params.stream().map(svc::processRequest).collect(Collectors.toList())).call(() -> {
    svc.close();
    return null;
}, ForkJoinPool.commonPool());

您还可以使用具有类似方法 allOf 的 Java 8 CompletableFuture 类:

CompletableFuture.allOf(params.stream().map(svc::processRequest).toArray(CompletableFuture[]::new))
    .thenAccept(v -> svc.close());

但在这种情况下,您必须使您的 Service 返回 CompletableFuture

关于java - 同步多个异步请求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48351368/

相关文章:

Java - 继续对整数添加值

java - 在创建可执行 jar 时指定 JVM 参数(而不是在执行 jar 时)

java - Swing 中两个线程上的同步块(synchronized block)不起作用

multithreading - 是否有一个 API 可以让 N 个线程(或 N 个线程上的 N 个闭包)完成?

java - 操作和变量是否会在堆栈顶部创建?

java - 如何处理 Camel 中的故障转移负载均衡器故障?

java - 具有 UserThread 和序列化的线程中的 Hibernate session

flutter - 解决后,Flutter/Dart将不会显示FutureBuilder快照

multithreading - 异步填充 Java Map 并将其作为 future 返回

asynchronous - flutter : Question about Futures and async function