我在使用共享 CyclicBarrier
的 n Runnable 类的 Java 并发解决方案中遇到问题,以及 Runnable
由 ExecutorService
处理,这是代码:
public class Worker implements Runnable {
private CyclicBarrier writeBarrier;
private int index;
private int valuetocalculate;
public Worker(int i,CyclicBarrier writeBarrier)
{
this.writeBarrier = writeBarrier;
this.index = i;
this.valuetocalculate = 0;
}
public void run() {
//calculations with valuetocalculate
writeBarrier.await();
//write new valuetocalculate value
}
}
public class Context {
private ArrayList<Worker> workers;
private Chief chief;
public Context()
{
workers = new ArrayList<Worker>();
chief = new Chief();
}
public void generateRandomWorkers(nworkers)
{
writeBarrier = newWriteBarrier(workers);
chief.setBarrier(writeBarrier);
//generate random woker
for (int i = 0; i<nworkers;i++)
{
Worker worker = new Worker(i,writeBarrier);
workers.add(worker);
}
chief.setWorkersArray(workers);
chief.start();
}
}
public class Chief extend Thread {
private CyclicBarrier writeBarrier;
private ArrayList<Worker> workers;
private ExecutorService executor;
private int cores;
public Chief ()
{
cores = Runtime.getRuntime().availableProcessors()+1;
}
public void setBarrier (CyclicBarrier writeBarrier)
{
this.writeBarrier = writeBarrier;
}
public setWorkersArray(ArrayList<Worker> workers)
{
this.workers = workers;
}
public ArrayList<Integer> getvaluetocalculate()
{
ArrayList<Integer> values = new ArrayList<Integer> ();
for (int i = 0; i<workers.size();i++)
{
values.add(workers.get(i).valuetocalculate);
}
return values;
}
public void run(){
while (!stop) //always true for testing
{
getvaluetocalculate();
//make calculations
writeBarrier.reset();
executor = Executors.newFixedThreadPool(cores);
for (int i = 0;i<workers.size();i++)
{
Runnable runnable = workers.get(i);
executor.execute(runnable);
}
executor.shutdown();
while (!executor.isTerminated())
{
}
}
}
}
一切从主要开始:
Context = new Context();
context.generateRandomWorkers();
问题是
Runnable
不会在 Chief
的运行中进行第一次“迭代” ,所以问题似乎是 worker 没有超过 writerBarrier.await();
,而不是如果我初始化这个:executor = Executors.newFixedThreadPool(cores);
与
workers.size()
,有效,但似乎不同步...我该如何解决?
最佳答案
好的,看起来您正在尝试执行以下操作
假设以上是您的问题。
valuetocalculate
变量在设置结果的工作人员和读取它的负责人之间不同步,因此您可能会看到陈旧的结果。 实现这种系统的正确方法是让工作人员实现 Callable,一旦工作人员计算完毕,可调用对象就会返回结果。这负责将结果发布给您的主管(您将在下面看到这是如何工作的)。
移除循环障碍,你不需要那个。
像现在一样创建执行器并调用
invokeAll()
带有可调用对象(您的工作人员)的列表。此方法使用执行程序调用 Callables 并等待它们完成。它会阻塞,直到所有工作人员都完成,此时它将返回 List<Future<T>>
.每个 Future 对应于您传入的其中一个 worker /可调用对象。迭代列表以提取结果。如果工作人员尝试 get()
失败结果是Future
会抛出异常。希望有帮助。
关于java - ExecutorService 与 Runnable 共享 CyclicBarrier,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23563466/