java - 为什么这个 CompletableFuture 不使用共享的 ExecutorService 执行?

标签 java multithreading

我正在考虑数据流语言的想法,并尝试使用执行程序服务将可用线程的数量限制为系统上的核心数量,并创建一个共享线程池。

 ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
 Function a = new Function("A", service);
 Function b = new Function("B", service);
 Function c = new Function("C", service);
 a.addObserver(b);
 a.addObserver(c);
 a.change()

A 发生更改,导致它调用它的 evaluate(),然后 B 和 C 被通知需要更新,并且他们会这样做。

Function.java中我有:

 public class Function implements Func{
    private boolean isComplete;
    private Set<Func> dependsOn;
    private Set<Func> observedBy;
    private ExecutorService service;

    public Function(String name, ExecutorService service){
        observedBy = new HashSet<>();
        dependsOn = new HashSet<>();
        this.isComplete = false;
        this.service = service;
    }

    public void addObserver(Func f){
        observedBy.add(f);
        f.addDependency(this);
    }

    private void evaluate() {
        boolean match = dependsOn.stream().allMatch(Func::isComplete);

        if (match) {
            this.isComplete = false;

            // inform each observer that I'm about to start to evaluate
            observedBy.forEach(func -> func.onDependentEvaluate(this));

            CompletableFuture.runAsync(() -> {
                try {
                    System.out.println("Doing some work");
                    int sleepTime = (int) (Math.random() * 5000);
                    System.out.println("Working for " + sleepTime);
                    Thread.sleep(sleepTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, service).thenRun(this::onComplete);
        }
    }

    public void addDependency(Func f){
        dependsOn.add(f);
    }

    @Override
    public void change() {;
        evaluate();
    }

    @Override
    public void onComplete() {
        this.isComplete = true;
        observedBy.forEach((f) -> f.onDependentComplete(this));
    }

    @Override
    public void onDependentEvaluate(Func f) {
    }

    @Override
    public void onDependentComplete(Func func){
        evaluate();
    }

    @Override
    public boolean isComplete() {
        return this.isComplete;
    }
}

当我将 service 传递给 runAsync 时,只有 A 执行。如果我不传递 service 并且 CompletableFuture 使用 ForkJoin.commonPool,则该图将按我的预期执行。是否有其他方法来共享 ExecutorService ?知道为什么下一个 Future 没有被执行吗?

最佳答案

你的问题比你想象的要简单得多。 tl;dr 您唯一的问题是您没有关闭 ExecutorService,因此您的程序不会终止。

长版本:在我的环境中,该程序按预期工作,并且任务 BC 实际上运行。我重构了您的代码,以允许我创建一个 AbstractFunction,允许我覆盖 Function 实际执行的行为,例如

public abstract class AbstractFunction implements Func {
  protected boolean isComplete;
  protected Set<Func> dependsOn;
  protected Set<Func> observedBy;
  protected ExecutorService service;
  protected String name;

  public AbstractFunction(String name, ExecutorService service) {
    observedBy = new HashSet<>();
    dependsOn = new HashSet<>();
    this.isComplete = false;
    this.service = service;
    this.name = name;
  }

  public void addObserver(Func f) {
    observedBy.add(f);
    f.addDependency(this);
  }

  public void addDependency(Func f) {
    dependsOn.add(f);
  }

  @Override
  public void onComplete() {
    this.isComplete = true;
    observedBy.forEach((f) -> f.onDependentComplete(this));
  }

  @Override
  public boolean isComplete() {
    return this.isComplete;
  }

  @Override
  public void change() { }

  @Override
  public void onDependentEvaluate(Func f) { }

  @Override
  public void onDependentComplete(Func func) { }
}

因此,您原来的(实际上未更改)Function 现在是这样的:

public class Function extends AbstractFunction implements Func {
  public Function(String name, ExecutorService service) {
    super(name, service);
  }

  protected void evaluate() {
    boolean match = dependsOn.stream().allMatch(Func::isComplete);

    if (match) {
      this.isComplete = false;

      // inform each observer that I'm about to start to evaluate
      observedBy.forEach(func -> func.onDependentEvaluate(this));

      CompletableFuture.runAsync(() -> {
        try {
          System.out.println("Doing some work");
          int sleepTime = (int) (Math.random() * 5000);
          System.out.println("Working for " + sleepTime);
          Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
          System.out.println("I was interrupted, my name is: " + name);
          e.printStackTrace();
        }
      }, service).thenRun(this::onComplete);
    }
  }

  @Override
  public void change() {
    evaluate();
  }

  @Override
  public void onDependentComplete(Func f) {
    evaluate();
  }
}

鉴于这个新结构,我创建了一个 ShutdownFunc 来监听并在 "C" 完成时关闭执行程序服务,即

public class ShutdownFunc extends AbstractFunction {
  public ShutdownFunc(String name, ExecutorService service) {
    super(name, service);
  }

  @Override
  public void onDependentComplete(Func f) {
    System.out.println("Shutting down the system");
    try {
      service.shutdown();
      service.awaitTermination(5, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

当然还有主要内容:

public class Main {
  public static void main(String... args) {
    ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime()
        .availableProcessors());
    Function a = new Function("A", service);
    Function b = new Function("B", service);
    Function c = new Function("C", service);
    a.addObserver(b);
    a.addObserver(c);

    Func d = new ShutdownFunc("D", service);
    c.addObserver(d);

    a.change();
  }
}

输出:

Doing some work
Working for 4315
Doing some work
Working for 2074
Doing some work
Working for 2884
Shutting down the system

程序正确地等待完成,然后正常终止。

关于java - 为什么这个 CompletableFuture 不使用共享的 ExecutorService 执行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31973629/

相关文章:

java - 可以处理 http 和 https 请求的代理服务器 - java

c++ - 多线程 - 效率降低,可能由 `false sharing` 引起

java - 处理InterruptedException的最佳方法

C# Getter-Setter 竞争条件

java - 面向对象编程 - 有关设计和访问相关数据的问题

java - .net有没有类似java的源码分析api?

c++ - C++线程创建/删除与线程停止/恢复

c++ - 错误 : use of deleted function ‘std::thread::thread(const std::thread&)'

java - 如何设置jtextarea的自动大小随单词长度变化

java - 如何测量Java代码所花费的时间?