Java Fork-Join 不适用于大型 ArrayList

标签 java arraylist parallel-processing fork-join forkjoinpool

我对并行性和并发性还很陌生,我正在尝试使用 Java 中的 Fork-Join 实现中值过滤算法。基本上我将一个输入文件读入 ArrayList 并使用该列表生成一个新的过滤中位数的 ArrayList(包括原始 ArrayList 的第一个和最后一个元素)。

现在我设法制作了该算法的串行/顺序版本,并且运行良好。但是,当我尝试制作 Fork-Join 版本时,它似乎不适用于大型 ArrayLists(100000+)。我用一个非常小的大小为 5 的 ArrayList 尝试了它,它工作正常。我似乎无法找到我的错误(我确定这是逻辑错误和/或实现错误)。任何帮助,将不胜感激。

这是顺序算法片段:

    //Add first boundary element to output ArrayList
    outputElements.add(this.elements.get(0));

    //Start Filter Algorithm 
    while(elements.size()-counter >= filterSize){
        for(int i = 0; i<filterSize; i++){
            tempElements.add(this.elements.get(i+counter));
            if(i==filterSize){
                break;
            }
        }

        Collections.sort(tempElements);
        outputElements.add(tempElements.get((filterSize-1)/2));

        counter++;
        tempElements.clear();
    }

    //Add last boundary element to output ArrayList.
    if (elements != null && !elements.isEmpty()) {
        outputElements.add(elements.get(elements.size()-1));
    }//End Filter Algorithm

这是我创建的并行类。这是不起作用的部分:

public class Parallel extends RecursiveTask<List<Float>>{
    int lo;
    int hi;
    int filterSize;
    String outFile; //Output file name.
    int arraySize;
    List<Float> elements = new ArrayList<Float>();
    List<Float> tempElements = new ArrayList<Float>();
    List<Float> outputElements = new ArrayList<Float>();
    int counter = 0;
    static final int SEQUENTIAL_CUTOFF=1000;

    public Parallel(List<Float> elements, int filterSize, String outFile, int lo, int hi) {
        this.lo = lo;
        this.hi = hi;
        this.elements = elements;
        this.outFile = outFile;
        this.filterSize = filterSize;       
        if(lo == 0){
            outputElements.add(this.elements.get(0));
        }
    }
    @Override
    protected List<Float> compute() {
        long startTime = System.nanoTime(); //Algorithm starts here 
        if((hi-lo) < SEQUENTIAL_CUTOFF) {
            while(hi-counter >= filterSize){
                for(int i = lo; i<filterSize; i++){
                    tempElements.add(this.elements.get(i+counter));
                    if(i==filterSize){
                        break;
                    }
                }               
                Collections.sort(tempElements);
                outputElements.add(tempElements.get((filterSize-1)/2));
                counter++;
                tempElements.clear();
                return outputElements;
            }
          }else{              
              Parallel left = new Parallel(this.elements, this.filterSize, this.outFile, this.lo, ((this.lo + this.hi)/2));
              Parallel right = new Parallel(this.elements, this.filterSize, this.outFile, ((this.hi + this.lo)/2), this.hi);
              left.fork();

              List<Float> leftArr = new ArrayList<Float>();
              List<Float> rightArr = new ArrayList<Float>();

             rightArr =  right.compute();
             leftArr = left.join();

             List<Float> newList = new ArrayList<Float>();
             newList.addAll(leftArr);
             newList.addAll(rightArr);       

          }
        long endTime = System.nanoTime();//Algorithm ends here.

        //Write elements to output file 
        PrintWriter writeOutput = null;
        try {
            writeOutput = new PrintWriter(this.outFile, "UTF-8");
        } catch (FileNotFoundException | UnsupportedEncodingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        writeOutput.println(outputElements.size());//Number of lines
        for(int i=0; i<outputElements.size();i++){
            writeOutput.println(i+1 + " " + outputElements.get(i)); //Each line is written
        }

        writeOutput.close(); //Close when output finished writing.
        System.out.println("Parallel complete");
        return null;
    }
}

非常感谢任何帮助。在花了几个小时并围绕 S.O 和 Google 进行了大量研究后,我无法解决这个问题。

编辑:musical_coder 建议发布我遇到的错误,它们就在这里。这是很多错误:

Exception in thread "main" java.lang.IndexOutOfBoundsException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:536)
    at java.util.concurrent.ForkJoinTask.reportResult(ForkJoinTask.java:596)
    at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:640)
    at java.util.concurrent.ForkJoinPool.invoke(ForkJoinPool.java:1521)
    at main.main(main.java:45)
Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 0
    at java.util.ArrayList.rangeCheck(ArrayList.java:635)
    at java.util.ArrayList.get(ArrayList.java:411)
    at Parallel.compute(Parallel.java:44)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:1)
    at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:93)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334)
    at java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604)
    at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784)
    at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398)

最佳答案

一般来说,你应该避免在多线程代码中使用ArrayList,因为它不是线程安全的:

Note that this implementation is not synchronized. If multiple threads access an ArrayList instance concurrently, and at least one of the threads modifies the list structurally, it must be synchronized externally.

我在您发布的片段中没有看到任何同时修改列表的内容,但我确实看到您将 this.elements 传递给子 Parallel 实例,这意味着您至少在做一些有风险的事情(在线程之间共享指向可变的非线程安全对象的指针)。

作为第一步,将 this.elements = elements; 替换为以下内容:

this.elements = Collections.unmodifiableList(elements);

通过制作列表unmodifiable ,您将确保如果您的 Parallel 代码试图改变列表,您将在失败点得到一个明确的错误。这不会阻止 Parallel 之外的其他东西修改原始 elements 列表,但它是验证 Parallel 行为的快速、简单的方法正确。如果您遇到 UnsupportedOperationException,您的 Parallel 类将需要重新设计 - 您不能同时修改 ArrayList

如果您没有得到一个UnsupportedOperationException,那么其他东西正在修改您的列表。您需要找到它并将其删除。


一旦您弄清楚是什么导致您的列表同时发生变化,您就可以尝试确定最佳的前进方式。通过所有“正确”的方式在线程之间共享数据超出了我希望在此答案中涵盖的范围,但这里有一些一般的经验法则:

  • 避免可变数据结构 - 将您的 Parallel 类设计为仅处理来自不可变数据结构的数据,例如 Guava's ImmutableList .默认情况下,不可变数据结构是线程安全的。
  • 使用线程安全的数据结构 - 例如,ConcurrentLinkedQueue是多个进程读取和写入相同数据结构的线程安全方式。 ConcurrentHashMap是另一个常用的类。您需要什么很大程度上取决于您要做什么,但这些都是很好的起点。
  • 最小化并发操作的范围 - 即使使用并发数据结构,您的总体目标也应该是让每个任务独立运行,保存从共享源读取和写入共享接收器.尽可能多地处理仅对一个线程可见的对象。
  • Synchronize - 我注意到 Parallel 在没有任何显式同步的情况下写入 outFile。这是危险的,并且可能会引入问题(崩溃或更糟糕的数据损坏)。一次只能有一个线程写入文件。通过拥有专用的文件写入线程或通过显式同步您的文件写入操作来实现这一点。

关于Java Fork-Join 不适用于大型 ArrayList,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31947228/

相关文章:

java - 如何向 glassfish 添加 Interbase 连接池?

java - 将 EBNF 文法转换为上下文无关文法

c++ - 线程构建 block 与 MPI,哪个更适合 mt 需要?

matlab - 使用 PARFOR 和 FOR 的不同结果

c# - 是否可以始终使用 Task 强制一个新线程?

java - Spring集成中的"Repeat until condition satisfied"

java JDBC ArrayIndexOutOfBoundsException 删除、更新时出现

java - arraylist的值在meny中不断重置

java - 将同名 xml 节点添加到 arrayList

c# - 如何在程序退出时保存变量值?