java - 取消已经运行的 CompletableFutures 的预期模式是什么

标签 java completable-future

我在“Java 8 实战”中找不到任何关于为什么 CompletableFuture 故意忽略 mayInterruptIfRunning 的信息。但即使是这样,我也没有真正看到自定义 cancel(boolean) 的任何 Hook ,这在中断不影响阻塞操作(例如 I/O)的情况下会派上用场流、简单的锁等)。到目前为止,任务本身似乎是预期的 Hook ,在 Future 的抽象级别上工作在这里根本不会有任何好处。

因此,我要问的是必须引入的最少一组样板代码,才能从这种情况中挤出一些 neet 自定义取消机制。

最佳答案

A CompletableFuture是封装三种状态之一的对象:

  1. 未完成,
  2. 完成一个值,或者
  3. 出色完成

从“未完成”到其他状态之一的转换可以由传递给其工厂方法之一或传递给 CompletionStage 之一的函数触发。实现方法。

但也可以调用 completecompleteExceptionally在上面。在这方面,请调用 cancel它与调用 completeExceptionally(new CancellationException()) 具有相同的效果在上面。

这也是它的类名所暗示的,这是一个可以完成而不是将[最终]完成的 future 。该类不提供对可能在任意线程中运行的现有完成尝试的控制,也不特别处理由其自身安排的完成尝试。

了解通过 CompletionStage 链接操作时也很重要实现方法,由此产生的 future 只代表链的最后阶段,取消它也只能影响最后阶段。

例如以下代码

CompletableFuture<?> cf = CompletableFuture.supplyAsync(() -> {
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
        String s = "value1";
        System.out.println("return initial " + s);
        return s;
    }).thenApply(s -> {
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
        s = s.toUpperCase();
        System.out.println("return transformed " + s);
        return s;
    }).thenAccept(s -> {
        System.out.println("starting last stage");
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
        System.out.println("processed " + s);
    });
cf.cancel(false);
ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);

将打印

return initial value1
return transformed VALUE1

证明只有链的最后阶段被取消,而前一阶段已完成,完全不受影响。

保留对第一阶段的引用以试图取消整个链只有在第一阶段尚未完成时才有效,因为取消已经完成的 future 的尝试没有任何效果。

long[] waitBeforeCancel = { 500, 1500 };
for(long l: waitBeforeCancel) {
    CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
        String s = "value1";
        System.out.println("return initial " + s);
        return s;
    });
    first.thenApply(s -> {
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
            s = s.toUpperCase();
            System.out.println("return transformed " + s);
            return s;
        }).thenAccept(s -> {
            System.out.println("starting last stage");
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
            System.out.println("processed " + s);
        });
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(l));
    System.out.println("Trying to cancel");
    first.cancel(false);
    ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);
    System.out.println();
}
Trying to cancel
return initial value1

return initial value1
Trying to cancel
return transformed VALUE1
starting last stage
processed VALUE1

这表明当第一阶段及时取消时整个链被取消(除了 Supplier 的第一个代码由于不存在的中断仍然完成),而取消太晚​​不会影响任何阶段.

记住所有CompletableFuture实例,为了能够取消所有这些,将破坏 API 的目的。您可以使用一个跟踪所有当前处理的作业的执行器,在最后一个阶段被取消时将取消和中断转发给它们。 CompletableFuture然后实现会将取消转发到相关阶段。这样,完成的阶段仍然可以被垃圾收集。

设置有点复杂;预先需要包装器执行器来构造 CompletableFuture链和取消转发需要已构建链的最后阶段。这就是为什么我制作了一个实用程序方法来接受链构造代码作为 Function<Executor,CompletableFuture<T>> :

static <T> Future<T> setupForInterruption(Function<Executor,CompletableFuture<T>> f) {
    return setupForInterruption(f, ForkJoinPool.commonPool());
}
static <T> Future<T> setupForInterruption(
        Function<Executor,CompletableFuture<T>> f, Executor e) {

    AtomicBoolean dontAcceptMore = new AtomicBoolean();
    Set<Future<?>> running = ConcurrentHashMap.newKeySet();
    Executor wrapper = r -> {
        if(dontAcceptMore.get()) throw new CancellationException();
        FutureTask<?> ft = new FutureTask<>(r, null) {
            @Override protected void done() { running.remove(this); }
        };
        running.add(ft);
        e.execute(ft);
    };
    CompletableFuture<T> cf = f.apply(wrapper);
    cf.whenComplete((v,t) -> {
        if(cf.isCancelled()) {
            dontAcceptMore.set(true);
            running.removeIf(ft -> ft.cancel(true) || ft.isDone());
        }
    });
    return cf;
}

这可以像这样使用

long[] waitBeforeCancel = { 500, 1500, 2500, 3500 };
for(long l: waitBeforeCancel) {
    Future<?> f = setupForInterruption(executor ->
        CompletableFuture.supplyAsync(() -> {
                LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
                if(Thread.interrupted()) throw new IllegalStateException();
                String s = "value1";
                System.out.println("return initial " + s);
                return s;
            }, executor).thenApplyAsync(s -> {
                LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
                if(Thread.interrupted()) throw new IllegalStateException();
                s = s.toUpperCase();
                System.out.println("return transformed " + s);
                return s;
            }, executor).thenAcceptAsync(s -> {
                System.out.println("starting last stage");
                LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
                if(Thread.interrupted()) throw new IllegalStateException();
                System.out.println("processed " + s);
            }, executor));
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(l));
    System.out.println("Trying to cancel");
    f.cancel(true);
    ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);
    System.out.println();
}
Trying to cancel

return initial value1
Trying to cancel

return initial value1
return transformed VALUE1
starting last stage
Trying to cancel

return initial value1
return transformed VALUE1
starting last stage
processed VALUE1
Trying to cancel

由于 API 使用 Supplier , Function , 和 Consumer , 他们都不允许扔 InterruptedException ,此示例代码对中断进行显式测试并抛出 IllegalStateException反而。这也是它使用 parkNanos 的原因它只是在中断时立即返回而不是 Thread.sleep首先。在实际应用场景中,您可能会调用中断敏感方法并且必须捕获 InterruptedException , InterruptedIOException , 或 InterruptedNamingException (等)并将它们转换为未经检查的异常。

请注意,上述方法总是会因中断而取消,因为 CompletableFuture不会说明取消是否中断。如果要获取这个参数的值,需要前端Future反射(reflect)最后阶段结果的实现,将取消转发给它,但传递 mayInterruptIfRunning 的值到当前正在运行的作业。

class FrontEnd<R> implements Future<R> {
    final CompletableFuture<R> lastStage;
    final Set<Future<?>> running;

    FrontEnd(CompletableFuture<R> lastStage, Set<Future<?>> running) {
        this.lastStage = lastStage;
        this.running = running;
    }
    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        boolean didCancel = lastStage.cancel(false);
        if(didCancel)
            running.removeIf(f -> f.cancel(mayInterruptIfRunning) || f.isDone());
        return didCancel;
    }
    @Override
    public boolean isCancelled() {
        return lastStage.isCancelled();
    }
    @Override
    public boolean isDone() {
        return lastStage.isDone();
    }
    @Override
    public R get() throws InterruptedException, ExecutionException {
        return lastStage.get();
    }
    @Override
    public R get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {

        return lastStage.get(timeout, unit);
    }

    static <T> Future<T> setup(Function<Executor,CompletableFuture<T>> f) {
        return setup(f, ForkJoinPool.commonPool());
    }
    static <T> Future<T> setup(Function<Executor,CompletableFuture<T>> f, Executor e) {
        AtomicBoolean dontAcceptMore = new AtomicBoolean();
        Set<Future<?>> running = ConcurrentHashMap.newKeySet();
        Executor wrapper = r -> {
            if(dontAcceptMore.get()) throw new CancellationException();
            FutureTask<?> ft = new FutureTask<>(r, null) {
                @Override protected void done() { running.remove(this); }
            };
            running.add(ft);
            e.execute(ft);
        };
        CompletableFuture<T> cf = f.apply(wrapper);
        cf.whenComplete((v,t) -> { if(cf.isCancelled()) dontAcceptMore.set(true); });
        return new FrontEnd<>(cf, running);
    }
}

关于java - 取消已经运行的 CompletableFutures 的预期模式是什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65539126/

相关文章:

java - Jetty 如何创建自定义 WebSocket

java - 如何使用数据库中的数据填充 ListView

java - 可完成的 future : Waiting for first one normally return?

java - 添加一层 thenApply 后出现不兼容类型 : bad return type in lambda expression X cannot be converted to CompetionStage<X>,

java - CompletableFuture 没有按预期工作

java - 收到输入的特定字符时停止循环

java - 两种渲染方法在两个屏幕上同时工作

java - 在mac上安装hadoop时出错

Java 8 Completable Future - 并行执行

asynchronous - 为什么 CompletableFuture 的多线程代码比单线程代码慢?