我在“Java 8 实战”中找不到任何关于为什么 CompletableFuture
故意忽略 mayInterruptIfRunning
的信息。但即使是这样,我也没有真正看到自定义 cancel(boolean)
的任何 Hook ,这在中断不影响阻塞操作(例如 I/O)的情况下会派上用场流、简单的锁等)。到目前为止,任务本身似乎是预期的 Hook ,在 Future
的抽象级别上工作在这里根本不会有任何好处。
因此,我要问的是必须引入的最少一组样板代码,才能从这种情况中挤出一些 neet 自定义取消机制。
最佳答案
A CompletableFuture
是封装三种状态之一的对象:
- 未完成,
- 完成一个值,或者
- 出色完成
从“未完成”到其他状态之一的转换可以由传递给其工厂方法之一或传递给 CompletionStage
之一的函数触发。实现方法。
但也可以调用 complete
或 completeExceptionally
在上面。在这方面,请调用 cancel
它与调用 completeExceptionally(new CancellationException())
具有相同的效果在上面。
这也是它的类名所暗示的,这是一个可以完成而不是将[最终]完成的 future 。该类不提供对可能在任意线程中运行的现有完成尝试的控制,也不特别处理由其自身安排的完成尝试。
了解通过 CompletionStage
链接操作时也很重要实现方法,由此产生的 future 只代表链的最后阶段,取消它也只能影响最后阶段。
例如以下代码
CompletableFuture<?> cf = CompletableFuture.supplyAsync(() -> {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
String s = "value1";
System.out.println("return initial " + s);
return s;
}).thenApply(s -> {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
s = s.toUpperCase();
System.out.println("return transformed " + s);
return s;
}).thenAccept(s -> {
System.out.println("starting last stage");
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
System.out.println("processed " + s);
});
cf.cancel(false);
ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);
将打印
return initial value1
return transformed VALUE1
证明只有链的最后阶段被取消,而前一阶段已完成,完全不受影响。
保留对第一阶段的引用以试图取消整个链只有在第一阶段尚未完成时才有效,因为取消已经完成的 future 的尝试没有任何效果。
long[] waitBeforeCancel = { 500, 1500 };
for(long l: waitBeforeCancel) {
CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
String s = "value1";
System.out.println("return initial " + s);
return s;
});
first.thenApply(s -> {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
s = s.toUpperCase();
System.out.println("return transformed " + s);
return s;
}).thenAccept(s -> {
System.out.println("starting last stage");
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
System.out.println("processed " + s);
});
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(l));
System.out.println("Trying to cancel");
first.cancel(false);
ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);
System.out.println();
}
Trying to cancel
return initial value1
return initial value1
Trying to cancel
return transformed VALUE1
starting last stage
processed VALUE1
这表明当第一阶段及时取消时整个链被取消(除了 Supplier
的第一个代码由于不存在的中断仍然完成),而取消太晚不会影响任何阶段.
记住所有CompletableFuture
实例,为了能够取消所有这些,将破坏 API 的目的。您可以使用一个跟踪所有当前处理的作业的执行器,在最后一个阶段被取消时将取消和中断转发给它们。 CompletableFuture
然后实现会将取消转发到相关阶段。这样,完成的阶段仍然可以被垃圾收集。
设置有点复杂;预先需要包装器执行器来构造 CompletableFuture
链和取消转发需要已构建链的最后阶段。这就是为什么我制作了一个实用程序方法来接受链构造代码作为 Function<Executor,CompletableFuture<T>>
:
static <T> Future<T> setupForInterruption(Function<Executor,CompletableFuture<T>> f) {
return setupForInterruption(f, ForkJoinPool.commonPool());
}
static <T> Future<T> setupForInterruption(
Function<Executor,CompletableFuture<T>> f, Executor e) {
AtomicBoolean dontAcceptMore = new AtomicBoolean();
Set<Future<?>> running = ConcurrentHashMap.newKeySet();
Executor wrapper = r -> {
if(dontAcceptMore.get()) throw new CancellationException();
FutureTask<?> ft = new FutureTask<>(r, null) {
@Override protected void done() { running.remove(this); }
};
running.add(ft);
e.execute(ft);
};
CompletableFuture<T> cf = f.apply(wrapper);
cf.whenComplete((v,t) -> {
if(cf.isCancelled()) {
dontAcceptMore.set(true);
running.removeIf(ft -> ft.cancel(true) || ft.isDone());
}
});
return cf;
}
这可以像这样使用
long[] waitBeforeCancel = { 500, 1500, 2500, 3500 };
for(long l: waitBeforeCancel) {
Future<?> f = setupForInterruption(executor ->
CompletableFuture.supplyAsync(() -> {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
if(Thread.interrupted()) throw new IllegalStateException();
String s = "value1";
System.out.println("return initial " + s);
return s;
}, executor).thenApplyAsync(s -> {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
if(Thread.interrupted()) throw new IllegalStateException();
s = s.toUpperCase();
System.out.println("return transformed " + s);
return s;
}, executor).thenAcceptAsync(s -> {
System.out.println("starting last stage");
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
if(Thread.interrupted()) throw new IllegalStateException();
System.out.println("processed " + s);
}, executor));
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(l));
System.out.println("Trying to cancel");
f.cancel(true);
ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);
System.out.println();
}
Trying to cancel
return initial value1
Trying to cancel
return initial value1
return transformed VALUE1
starting last stage
Trying to cancel
return initial value1
return transformed VALUE1
starting last stage
processed VALUE1
Trying to cancel
由于 API 使用 Supplier
, Function
, 和 Consumer
, 他们都不允许扔 InterruptedException
,此示例代码对中断进行显式测试并抛出 IllegalStateException
反而。这也是它使用 parkNanos
的原因它只是在中断时立即返回而不是 Thread.sleep
首先。在实际应用场景中,您可能会调用中断敏感方法并且必须捕获 InterruptedException
, InterruptedIOException
, 或 InterruptedNamingException
(等)并将它们转换为未经检查的异常。
请注意,上述方法总是会因中断而取消,因为 CompletableFuture
不会说明取消是否中断。如果要获取这个参数的值,需要前端Future
反射(reflect)最后阶段结果的实现,将取消转发给它,但传递 mayInterruptIfRunning
的值到当前正在运行的作业。
class FrontEnd<R> implements Future<R> {
final CompletableFuture<R> lastStage;
final Set<Future<?>> running;
FrontEnd(CompletableFuture<R> lastStage, Set<Future<?>> running) {
this.lastStage = lastStage;
this.running = running;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean didCancel = lastStage.cancel(false);
if(didCancel)
running.removeIf(f -> f.cancel(mayInterruptIfRunning) || f.isDone());
return didCancel;
}
@Override
public boolean isCancelled() {
return lastStage.isCancelled();
}
@Override
public boolean isDone() {
return lastStage.isDone();
}
@Override
public R get() throws InterruptedException, ExecutionException {
return lastStage.get();
}
@Override
public R get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return lastStage.get(timeout, unit);
}
static <T> Future<T> setup(Function<Executor,CompletableFuture<T>> f) {
return setup(f, ForkJoinPool.commonPool());
}
static <T> Future<T> setup(Function<Executor,CompletableFuture<T>> f, Executor e) {
AtomicBoolean dontAcceptMore = new AtomicBoolean();
Set<Future<?>> running = ConcurrentHashMap.newKeySet();
Executor wrapper = r -> {
if(dontAcceptMore.get()) throw new CancellationException();
FutureTask<?> ft = new FutureTask<>(r, null) {
@Override protected void done() { running.remove(this); }
};
running.add(ft);
e.execute(ft);
};
CompletableFuture<T> cf = f.apply(wrapper);
cf.whenComplete((v,t) -> { if(cf.isCancelled()) dontAcceptMore.set(true); });
return new FrontEnd<>(cf, running);
}
}
关于java - 取消已经运行的 CompletableFutures 的预期模式是什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65539126/