我正在考虑数据流语言的想法,并尝试使用执行程序服务将可用线程的数量限制为系统上的核心数量,并创建一个共享线程池。
ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
Function a = new Function("A", service);
Function b = new Function("B", service);
Function c = new Function("C", service);
a.addObserver(b);
a.addObserver(c);
a.change()
A 发生更改,导致它调用它的 evaluate()
,然后 B 和 C 被通知需要更新,并且他们会这样做。
在Function.java
中我有:
public class Function implements Func{
private boolean isComplete;
private Set<Func> dependsOn;
private Set<Func> observedBy;
private ExecutorService service;
public Function(String name, ExecutorService service){
observedBy = new HashSet<>();
dependsOn = new HashSet<>();
this.isComplete = false;
this.service = service;
}
public void addObserver(Func f){
observedBy.add(f);
f.addDependency(this);
}
private void evaluate() {
boolean match = dependsOn.stream().allMatch(Func::isComplete);
if (match) {
this.isComplete = false;
// inform each observer that I'm about to start to evaluate
observedBy.forEach(func -> func.onDependentEvaluate(this));
CompletableFuture.runAsync(() -> {
try {
System.out.println("Doing some work");
int sleepTime = (int) (Math.random() * 5000);
System.out.println("Working for " + sleepTime);
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, service).thenRun(this::onComplete);
}
}
public void addDependency(Func f){
dependsOn.add(f);
}
@Override
public void change() {;
evaluate();
}
@Override
public void onComplete() {
this.isComplete = true;
observedBy.forEach((f) -> f.onDependentComplete(this));
}
@Override
public void onDependentEvaluate(Func f) {
}
@Override
public void onDependentComplete(Func func){
evaluate();
}
@Override
public boolean isComplete() {
return this.isComplete;
}
}
当我将 service
传递给 runAsync
时,只有 A 执行。如果我不传递 service
并且 CompletableFuture 使用 ForkJoin.commonPool,则该图将按我的预期执行。是否有其他方法来共享 ExecutorService ?知道为什么下一个 Future 没有被执行吗?
最佳答案
你的问题比你想象的要简单得多。 tl;dr 您唯一的问题是您没有关闭 ExecutorService
,因此您的程序不会终止。
长版本:在我的环境中,该程序按预期工作,并且任务 B
和 C
实际上运行。我重构了您的代码,以允许我创建一个 AbstractFunction
,允许我覆盖 Function
实际执行的行为,例如
public abstract class AbstractFunction implements Func {
protected boolean isComplete;
protected Set<Func> dependsOn;
protected Set<Func> observedBy;
protected ExecutorService service;
protected String name;
public AbstractFunction(String name, ExecutorService service) {
observedBy = new HashSet<>();
dependsOn = new HashSet<>();
this.isComplete = false;
this.service = service;
this.name = name;
}
public void addObserver(Func f) {
observedBy.add(f);
f.addDependency(this);
}
public void addDependency(Func f) {
dependsOn.add(f);
}
@Override
public void onComplete() {
this.isComplete = true;
observedBy.forEach((f) -> f.onDependentComplete(this));
}
@Override
public boolean isComplete() {
return this.isComplete;
}
@Override
public void change() { }
@Override
public void onDependentEvaluate(Func f) { }
@Override
public void onDependentComplete(Func func) { }
}
因此,您原来的(实际上未更改)Function
现在是这样的:
public class Function extends AbstractFunction implements Func {
public Function(String name, ExecutorService service) {
super(name, service);
}
protected void evaluate() {
boolean match = dependsOn.stream().allMatch(Func::isComplete);
if (match) {
this.isComplete = false;
// inform each observer that I'm about to start to evaluate
observedBy.forEach(func -> func.onDependentEvaluate(this));
CompletableFuture.runAsync(() -> {
try {
System.out.println("Doing some work");
int sleepTime = (int) (Math.random() * 5000);
System.out.println("Working for " + sleepTime);
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
System.out.println("I was interrupted, my name is: " + name);
e.printStackTrace();
}
}, service).thenRun(this::onComplete);
}
}
@Override
public void change() {
evaluate();
}
@Override
public void onDependentComplete(Func f) {
evaluate();
}
}
鉴于这个新结构,我创建了一个 ShutdownFunc
来监听并在 "C"
完成时关闭执行程序服务,即
public class ShutdownFunc extends AbstractFunction {
public ShutdownFunc(String name, ExecutorService service) {
super(name, service);
}
@Override
public void onDependentComplete(Func f) {
System.out.println("Shutting down the system");
try {
service.shutdown();
service.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
当然还有主要内容:
public class Main {
public static void main(String... args) {
ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime()
.availableProcessors());
Function a = new Function("A", service);
Function b = new Function("B", service);
Function c = new Function("C", service);
a.addObserver(b);
a.addObserver(c);
Func d = new ShutdownFunc("D", service);
c.addObserver(d);
a.change();
}
}
输出:
Doing some work
Working for 4315
Doing some work
Working for 2074
Doing some work
Working for 2884
Shutting down the system
程序正确地等待完成,然后正常终止。
关于java - 为什么这个 CompletableFuture 不使用共享的 ExecutorService 执行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31973629/