java - ExecutorService 与 Runnable 共享 CyclicBarrier

标签 java multithreading executorservice cyclicbarrier

我在使用共享 CyclicBarrier 的 n Runnable 类的 Java 并发解决方案中遇到问题,以及 RunnableExecutorService 处理,这是代码:

public class Worker implements Runnable {
   private CyclicBarrier writeBarrier;
   private int index;
   private int valuetocalculate;

   public Worker(int i,CyclicBarrier writeBarrier)
   {
      this.writeBarrier = writeBarrier;
      this.index = i;
      this.valuetocalculate = 0;
   }

   public void run() {

      //calculations with valuetocalculate
      writeBarrier.await();
      //write new valuetocalculate value
   }
}

public class Context {

    private ArrayList<Worker> workers;
    private Chief chief;

    public Context()
    {
       workers = new ArrayList<Worker>();
       chief = new Chief();
    }

    public void generateRandomWorkers(nworkers)
    {
       writeBarrier = newWriteBarrier(workers);
       chief.setBarrier(writeBarrier);
       //generate random woker
       for (int i = 0; i<nworkers;i++)
       {
           Worker worker = new Worker(i,writeBarrier);
           workers.add(worker);
       }
       chief.setWorkersArray(workers);
       chief.start();
     }
}

public class Chief extend Thread {

    private CyclicBarrier writeBarrier;
    private ArrayList<Worker> workers;
    private ExecutorService executor;
    private int cores;

    public Chief ()
    {
       cores = Runtime.getRuntime().availableProcessors()+1;
    }

    public void setBarrier (CyclicBarrier writeBarrier)
    {
       this.writeBarrier = writeBarrier;
    }

    public setWorkersArray(ArrayList<Worker> workers)
    {
       this.workers = workers;
    }

    public ArrayList<Integer> getvaluetocalculate()
    {
        ArrayList<Integer> values = new ArrayList<Integer> ();
        for (int i = 0; i<workers.size();i++)
        {
           values.add(workers.get(i).valuetocalculate);
        }

         return values;
    }

    public void run(){
       while (!stop) //always true for testing
       {
          getvaluetocalculate();
          //make calculations
          writeBarrier.reset();
          executor = Executors.newFixedThreadPool(cores);
          for (int i = 0;i<workers.size();i++)
          {
             Runnable runnable = workers.get(i);
             executor.execute(runnable);
           }
           executor.shutdown();
           while (!executor.isTerminated())
           {
           }
        }
     }
}

一切从主要开始:
Context = new Context();
context.generateRandomWorkers();

问题是 Runnable不会在 Chief 的运行中进行第一次“迭代” ,所以问题似乎是 worker 没有超过 writerBarrier.await(); ,而不是如果我初始化这个:
executor = Executors.newFixedThreadPool(cores);

workers.size() ,有效,但似乎不同步...我该如何解决?

最佳答案

好的,看起来您正在尝试执行以下操作

  • 一名控制 worker 的主管/调度员
  • 一名或多名 worker 进行计算
  • 酋长处决所有 worker
  • 酋长等待所有 worker 完成
  • Chief 从每个 Worker 获取结果以计算结果

  • 假设以上是您的问题。
  • 在worker run() 方法中执行barrier.await() 可防止该线程被释放回池以运行后续worker。因此,当池大小 < worker 大小时,第一个“池大小” worker 消耗线程,然后停止等待其他无法运行的线程。这就是为什么除非您将池大小更改为workers.size(),否则您的所有worker 都不会运行。
  • valuetocalculate变量在设置结果的工作人员和读取它的负责人之间不同步,因此您可能会看到陈旧的结果。

  • 实现这种系统的正确方法是让工作人员实现 Callable,一旦工作人员计算完毕,可调用对象就会返回结果。这负责将结果发布给您的主管(您将在下面看到这是如何工作的)。

    移除循环障碍,你不需要那个。

    像现在一样创建执行器并调用 invokeAll()带有可调用对象(您的工作人员)的列表。此方法使用执行程序调用 Callables 并等待它们完成。它会阻塞,直到所有工作人员都完成,此时它将返回 List<Future<T>> .每个 Future 对应于您传入的其中一个 worker /可调用对象。迭代列表以提取结果。如果工作人员尝试 get() 失败结果是Future会抛出异常。

    希望有帮助。

    关于java - ExecutorService 与 Runnable 共享 CyclicBarrier,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23563466/

    相关文章:

    java - 在多线程环境中并行执行每个子任务

    Java ExecutorService - 有时比顺序处理慢?

    java - 如果我的类(class)已经在扩展另一个类(class),如何使用 unicastremoteobject ...?

    java - 数组变量基类型

    python - 线程卡住(python w/GTK)

    java - 在 Hadoop 集群中使用另一个类的静态变量

    java - 向 ThreadPoolExecutor 的 BlockingQueue 添加任务是否可取?

    java - 在 java 程序运行之间存储数据的最佳方式?

    java - J2ME setCurrent()

    java多线程(newCachedThreadPool),然后将结果写入一个文件?