java - 取消一个 CompletableFuture 链

标签 java java-8

我有一个我想取消的异步服务调用链。好吧,实际上,我有两个并行进行的服务调用链,如果一个成功,我想取消另一个。

对于 guava 的 future ,我习惯于通过取消最后一个 future 来取消整个 future 链。看来我不能用 java-8 的 future 来做到这一点。 除非有人知道怎么做。

你的任务,如果你选择接受它,是告诉我是否可以保持我漂亮的语法并取消链。否则,我将编写自己的链接 future 包装器 - 特别是在 this question 之后.


下面是我自己的测试和尝试。

@Test
public void shouldCancelOtherFutures() {
    // guava
    ListenableFuture<String> as = Futures.immediateFuture("a");
    ListenableFuture<String> bs = Futures.transform(as, (AsyncFunction<String, String>) x -> SettableFuture.create());
    ListenableFuture<String> cs = Futures.transform(bs, (AsyncFunction<String, String>) x -> SettableFuture.create());
    ListenableFuture<String> ds = Futures.transform(cs, Functions.<String>identity());

    ds.cancel(false);
    assertTrue(cs.isDone()); // succeeds

    // jdk 8
    CompletableFuture<String> ac = CompletableFuture.completedFuture("a");
    CompletableFuture<String> bc = ac.thenCompose(x -> new CompletableFuture<>());
    CompletableFuture<String> cc = bc.thenCompose(x -> new CompletableFuture<>());
    CompletableFuture<String> dc = cc.thenApply(Function.identity());

    dc.cancel(false);
    assertTrue(cc.isDone()); // fails
}

(假设每个 thenCompose()Futures.transform(x, AsyncFunction) 代表一个异步服务调用。)

我明白为什么 Doug Lee 的研究生大军会这样做。有了分支链,一切都应该被取消吗?

CompletableFuture<Z> top = new CompletableFuture<>()
    .thenApply(x -> y(x))
    .thenCompose(y -> z(y));

CompletableFuture<?> aBranch = top.thenCompose(z -> aa(z));
CompletableFuture<?> bBranch = top.thenCompose(z -> bb(z));

...
bBranch.cancel(false);
// should aBranch be canceled now?

我可以使用自定义包装函数解决这个问题,但它会扰乱漂亮的语法。

private <T,U> CompletableFuture<U> transformAsync(CompletableFuture<T> source, Function<? super T,? extends CompletableFuture<U>> transform) {
    CompletableFuture<U> next = source.thenCompose(transform);
    next.whenComplete((x, err) -> next.cancel(false));
    return next;
}

private <T,U> CompletableFuture<U> transform(CompletableFuture<T> source, Function<T,U> transform) {
    CompletableFuture<U> next = source.thenApply(transform);
    next.whenComplete((x, err) -> next.cancel(false));
    return next;
}

// nice syntax I wished worked
CompletableFuture<?> f1 = serviceCall()
        .thenApply(w -> x(w))
        .thenCompose(x -> serviceCall())
        .thenCompose(y -> serviceCall())
        .thenApply(z -> $(z));

// what works, with less readable syntax
CompletableFuture<?> f2 =
        transform(
            transformAsync(
                transformAsync(
                    transform(serviceCall, x(w)),
                    x -> serviceCall()),
                y -> serviceCall()),
            z -> $(z));

最佳答案

这取决于你的目标是什么。我认为,拥有中间 CompletableFuture 并不重要s 报告完成状态,因为您在使用链式构造调用时通常不会注意到。重要的一点是你希望你的贵serviceCall()不被触发。

一个解决方案可以是:

CompletableFuture<String> flag=new CompletableFuture<>();

CompletableFuture<String> ac = serviceCall()
  .thenCompose(x -> flag.isCancelled()? flag: serviceCall())
  .thenCompose(x -> flag.isCancelled()? flag: serviceCall());
ac.whenComplete((v,t)->flag.cancel(false));// don’t chain this call

这使用了 whenComplete在您的解决方案中调用 like 但仅在最终 CompletableFuture 上调用将取消传播到专用的 flag目的。调用后cancel下一个thenCompose调用将检测取消并返回取消的 future ,因此取消将传播链,因此不再调用组合或应用方法。

缺点是不能与thenApply结合使用作为Function无法返回已取消的 future 。因此,当异步服务调用完成并与 Function 链接时,即使发生取消,该功能也会应用。

解决此问题的替代解决方案是为您的 serviceCall 创建一个包装函数其中包括启动前和完成后的测试:

CompletableFuture<String> serviceCall(CompletableFuture<String> f) {
    if(f.isCancelled()) return f;
    CompletableFuture<String> serviceCall=serviceCall();
    return serviceCall.thenCompose(x->f.isCancelled()? f: serviceCall);
}

那么您的用例将如下所示:

CompletableFuture<String> flag=new CompletableFuture<>();

CompletableFuture<String> ac = serviceCall(flag)
  .thenApply(w->x(w))
  .thenCompose(x -> serviceCall(flag))
  .thenCompose(x -> serviceCall(flag))
  .thenApply(z -> $(z));
ac.whenComplete((v,t)->flag.cancel(false));

当然,你必须替换<String>使用原始 serviceCall() 的任何类型参数用于 CompletableFuture<T> .

关于java - 取消一个 CompletableFuture 链,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25417881/

相关文章:

java - 为什么 Spring Web/MVC 不将我的模型属性添加到我的重定向 View 的 URL 中?

java - 如何使 JsonGenerator 漂亮地打印 Date 和 DateTime 值?

java - 如何在 java 中通过键-> 集合映射流式传输,其中每个流式元素都是键和集合中的每个元素?

java - 为什么在静态上下文中使用实例方法时 javac 会发出 "error: method in class cannot be applied to given types"?

java - java 8 中 anyMatch 和 findAny 的区别

java - Android:如果在方法中调用 AsyncTask.execute(new Runnable()),则重写 onPostExecute()

java - ehcache memorystoresize 和 diskstoresize

java - Mandelbrot 集的视觉表示

java - 如何使用流替换两个循环并保留列表中相同的元素

java - CompletableFuture 如何知道任务是独立的?