java - 如何继承 CompletableFuture?

标签 java java-8 future

我想继承CompletableFuture覆盖默认的 Executor。也就是说,如果用户在未指定 Executor 的情况下调用方法,我希望使用我自己的 Executor 而不是 CompletableFuture 通常使用的那个>.

Javadoc 暗示了子类化的可能性:

All CompletionStage methods are implemented independently of other public methods, so the behavior of one method is not impacted by overrides of others in subclasses.

如果底层实现依赖于像 internalComplete() 这样包私有(private)的方法,我应该如何在子类中实现像 CompletableFuture.supplyAsync() 这样的静态方法?

应该如何继承 CompletableFuture?


我想做什么...

我的用户代码需要使用同一个执行器异步执行多个任务。例如:CompletableFuture.supplyAsync(..., executor).thenApplyAsync(..., executor).thenApplyAsync(..., executor)。我希望自定义 CompletableFuture 实现在所有后续调用中使用第一个执行程序。

最佳答案

由于您没有向我们展示您尝试过的内容,我们没有机会了解您具体做了什么以及失败的原因。在你澄清之后,它看起来像是一个直接的装饰模式工作,不需要触及 CompletableFuture 的任何内部工作。

import java.util.concurrent.*;
import java.util.function.*;

public class MyCompletableFuture<T> extends CompletableFuture<T> {
    public static <T> CompletableFuture<T> supplyAsync(Supplier<T> s, Executor e) {
        return my(CompletableFuture.supplyAsync(s, e), e);
    }
    private static <T> CompletableFuture<T> my(CompletableFuture<T> f, Executor e) {
        MyCompletableFuture<T> my=new MyCompletableFuture<>(f, e);
        f.whenComplete((v,t)-> {
            if(t!=null) my.completeExceptionally(t); else my.complete(v);
        });
        return my;
    }
    private final CompletableFuture<T> baseFuture;
    private final Executor executor;

    MyCompletableFuture(CompletableFuture<T> base, Executor e) {
        baseFuture=base;
        executor=e;
    }
    private <T> CompletableFuture<T> my(CompletableFuture<T> base) {
        return my(base, executor);
    }
    @Override
    public CompletableFuture<Void> acceptEitherAsync(
            CompletionStage<? extends T> other, Consumer<? super T> action) {
        return my(baseFuture.acceptEitherAsync(other, action, executor));
    }
    @Override
    public <U> CompletableFuture<U> applyToEitherAsync(
            CompletionStage<? extends T> other, Function<? super T, U> fn) {
        return my(baseFuture.applyToEitherAsync(other, fn, executor));
    }
    @Override
    public <U> CompletableFuture<U> handleAsync(
            BiFunction<? super T, Throwable, ? extends U> fn) {
        return my(baseFuture.handleAsync(fn, executor));
    }
    @Override
    public CompletableFuture<Void> runAfterBothAsync(
            CompletionStage<?> other, Runnable action) {
        return my(baseFuture.runAfterBothAsync(other, action, executor));
    }
    @Override
    public CompletableFuture<Void> runAfterEitherAsync(
            CompletionStage<?> other, Runnable action) {
        return my(baseFuture.runAfterEitherAsync(other, action, executor));
    }
    @Override
    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
        return my(baseFuture.thenAcceptAsync(action, executor));
    }
    @Override
    public <U> CompletableFuture<Void> thenAcceptBothAsync(
            CompletionStage<? extends U> other,
            BiConsumer<? super T, ? super U> action) {
        return my(baseFuture.thenAcceptBothAsync(other, action, executor));
    }
    @Override
    public <U> CompletableFuture<U> thenApplyAsync(
            Function<? super T, ? extends U> fn) {
        return my(baseFuture.thenApplyAsync(fn, executor));
    }
    @Override
    public <U, V> CompletableFuture<V> thenCombineAsync(
            CompletionStage<? extends U> other,
            BiFunction<? super T, ? super U, ? extends V> fn) {
        return my(baseFuture.thenCombineAsync(other, fn, executor));
    }
    @Override
    public <U> CompletableFuture<U> thenComposeAsync(
            Function<? super T, ? extends CompletionStage<U>> fn) {
        return my(baseFuture.thenComposeAsync(fn, executor));
    }
    @Override
    public CompletableFuture<Void> thenRunAsync(Runnable action) {
        return my(baseFuture.thenRunAsync(action, executor));
    }
    @Override
    public CompletableFuture<T> whenCompleteAsync(
            BiConsumer<? super T, ? super Throwable> action) {
        return my(baseFuture.whenCompleteAsync(action, executor));
    }
}

这是一个简单的测试用例,表明它按预期工作:

ScheduledExecutorService ses=Executors.newSingleThreadScheduledExecutor();
Executor e=r -> {
    System.out.println("adding delay");
    ses.schedule(r, 2, TimeUnit.SECONDS);
};
MyCompletableFuture.supplyAsync(()->"initial value", e)
  .thenApplyAsync(String::hashCode)
  .thenApplyAsync(Integer::toOctalString)
  .thenAcceptAsync(System.out::println);

关于java - 如何继承 CompletableFuture?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26579139/

相关文章:

scala - 如何在它们自己的执行上下文中而不是在 Actor 系统调度程序上运行 future。 [斯卡拉| Akka ]

java - java 将数组元素按顺序插入数据库

java - 如何使用 Graph API 将图像上传到 Facebook

java - Mac OS上的OpenJDK8和JavaFX

java-8 - 如何在google CacheCacheLoader中传递多个参数?

java - 用 java8 lambda 函数替换所有

java - Scala - 如何使用 Futures 在发生异常时发送失败响应

ScalaTest 无法验证 Future 中的模拟函数调用

java - 如何在实现 Parcelable 的类中使用 transient 变量?

java - 具有自定义布局的 ListView,包括两个 RadioButton