java - 随着线程数量的增加,Levenshtein 距离的并行实现速度会变慢

标签 java multithreading parallel-processing

这是我出于乐趣而编写的 Levenshtein 距离的并行实现。我对结果感到失望。我在核心 i7 处理器上运行它,因此我有大量可用线程。但是,当我增加线程数时,性能会显着下降。我的意思是,对于相同大小的输入,如果有更多线程,它实际上运行得更慢。

我希望有人可以看看我使用线程和 java.util.concurrent 包的方式,并告诉我是否做错了什么。我真的只对并行性没有像我预期的那样工作的原因感兴趣。我不希望读者看到这里发生的复杂索引。我相信我所做的计算是正确的。但即使不是,我认为当我增加线程池中的线程数量时,我仍然应该看到接近线性的加速。

我已经包含了我使用的基准测试代码。我正在使用找到的库 here用于基准测试。第二个代码块是我用于基准测试的代码块。

感谢您的帮助:)。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class EditDistance {
    private static final int MIN_CHUNK_SIZE = 5;
    private final ExecutorService threadPool;
    private final int threadCount;
    private final String maxStr;
    private final String minStr;
    private final int maxLen;
    private final int minLen;

    public EditDistance(String s1, String s2, ExecutorService threadPool,
            int threadCount) {
        this.threadCount = threadCount;
        this.threadPool = threadPool;
        if (s1.length() < s2.length()) {
            minStr = s1;
            maxStr = s2;
        } else {
            minStr = s2;
            maxStr = s1;
        }
        maxLen = maxStr.length();
        minLen = minStr.length();
    }

    public int editDist() {
        int iterations = maxLen + minLen - 1;
        int[] prev = new int[0];
        int[] current = null;

        for (int i = 0; i < iterations; i++) {
            int currentLen;
            if (i < minLen) {
                currentLen = i + 1;
            } else if (i < maxLen) {
                currentLen = minLen;
            } else {
                currentLen = iterations - i;
            }

            current = new int[currentLen * 2 - 1];
            parallelize(prev, current, currentLen, i);
            prev = current;
        }
        return current[0];
    }

    private void parallelize(int[] prev, int[] current, int currentLen,
            int iteration) {
        int chunkSize = Math.max(current.length / threadCount, MIN_CHUNK_SIZE);
        List<Future<?>> futures = new ArrayList<Future<?>>(currentLen);
        for (int i = 0; i < currentLen; i += chunkSize) {
            int stopIdx = Math.min(currentLen, i + chunkSize);
            Runnable worker = new Worker(prev, current, currentLen, iteration,
                    i, stopIdx);
            futures.add(threadPool.submit(worker));
        }
        for (Future<?> future : futures) {
            try {
                Object result = future.get();
                if (result != null) {
                    throw new RuntimeException(result.toString());
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (ExecutionException e) {
                // We can only finish the computation if we complete
                // all subproblems
                throw new RuntimeException(e);
            }
        }
    }

    private void doChunk(int[] prev, int[] current, int currentLen,
            int iteration, int startIdx, int stopIdx) {
        int mergeStartIdx = (iteration < minLen) ? 0 : 2;

        for (int i = startIdx; i < stopIdx; i++) {
            // Edit distance
            int x;
            int y;
            int leftIdx;
            int downIdx;
            int diagonalIdx;
            if (iteration < minLen) {
                x = i;
                y = currentLen - i - 1;
                leftIdx = i * 2 - 2;
                downIdx = i * 2;
                diagonalIdx = i * 2 - 1;
            } else {
                x = i + iteration - minLen + 1;
                y = minLen - i - 1;
                leftIdx = i * 2;
                downIdx = i * 2 + 2;
                diagonalIdx = i * 2 + 1;
            }
            int left = 1 + ((leftIdx < 0) ? iteration + 1 : prev[leftIdx]);
            int down = 1 + ((downIdx < prev.length) ? prev[downIdx]
                    : iteration + 1);
            int diagonal = penalty(x, y)
                    + ((diagonalIdx < 0 || diagonalIdx >= prev.length) ? iteration
                            : prev[diagonalIdx]);
            int dist = Math.min(left, Math.min(down, diagonal));
            current[i * 2] = dist;

            // Merge prev
            int mergeIdx = i * 2 + 1;
            if (mergeIdx < current.length) {
                current[mergeIdx] = prev[mergeStartIdx + i * 2];
            }
        }
    }

    private int penalty(int maxIdx, int minIdx) {
        return (maxStr.charAt(maxIdx) == minStr.charAt(minIdx)) ? 0 : 1;
    }

    private class Worker implements Runnable {
        private final int[] prev;
        private final int[] current;
        private final int currentLen;
        private final int iteration;
        private final int startIdx;
        private final int stopIdx;

        Worker(int[] prev, int[] current, int currentLen, int iteration,
                int startIdx, int stopIdx) {
            this.prev = prev;
            this.current = current;
            this.currentLen = currentLen;
            this.iteration = iteration;
            this.startIdx = startIdx;
            this.stopIdx = stopIdx;
        }

        @Override
        public void run() {
            doChunk(prev, current, currentLen, iteration, startIdx, stopIdx);
        }
    }

    public static void main(String args[]) {
        int threadCount = 4;
        ExecutorService threadPool = Executors.newFixedThreadPool(threadCount);
        EditDistance ed = new EditDistance("Saturday", "Sunday", threadPool,
                threadCount);
        System.out.println(ed.editDist());
        threadPool.shutdown();
    }
}

EditDistance 内部有一个私有(private)内部类 Worker。每个worker负责使用EditDistance.doChunk填充当前数组的范围。 EditDistance.parallelize 负责创建这些工作人员,并等待他们完成任务。

以及我用于基准测试的代码:

import java.io.PrintStream;
import java.util.concurrent.*;
import org.apache.commons.lang3.RandomStringUtils;
import bb.util.Benchmark;

public class EditDistanceBenchmark {

    public static void main(String[] args) {
        if (args.length != 2) {
            System.out.println("Usage: <string length> <thread count>");
            System.exit(1);
        }
        PrintStream oldOut = System.out;
        System.setOut(System.err);

        int strLen = Integer.parseInt(args[0]);
        int threadCount = Integer.parseInt(args[1]);
        String s1 = RandomStringUtils.randomAlphabetic(strLen);
        String s2 = RandomStringUtils.randomAlphabetic(strLen);
        ExecutorService threadPool = Executors.newFixedThreadPool(threadCount);

        Benchmark b = new Benchmark(new Benchmarker(s1, s2, threadPool,threadCount));
        System.setOut(oldOut);

        System.out.println("threadCount: " + threadCount + 
                " string length: "+ strLen + "\n\n" + b);
        System.out.println("s1: " + s1 + "\ns2: " + s2);

        threadPool.shutdown();
    }

    private static class Benchmarker implements Runnable {
        private final String s1, s2;
        private final int threadCount;
        private final ExecutorService threadPool;

        private Benchmarker(String s1, String s2, ExecutorService threadPool, int threadCount) {
            this.s1 = s1;
            this.s2 = s2;
            this.threadPool = threadPool;
            this.threadCount = threadCount;
        }

        @Override
        public void run() {
            EditDistance d = new EditDistance(s1, s2, threadPool, threadCount);
            d.editDist();
        }

    }
}

最佳答案

很容易意外地编写出不能很好并行化的代码。罪魁祸首是当您的线程竞争底层系统资源(例如缓存行)时。由于该算法本质上作用于物理内存中彼此接近的事物,因此我强烈怀疑这可能是罪魁祸首。

我建议您阅读这篇关于虚假共享的优秀文章

http://www.drdobbs.com/go-parallel/article/217500206?pgno=3

然后仔细检查您的代码是否存在线程相互阻塞的情况。

此外,如果您的线程受 CPU 限制,则运行多于 CPU 核心数的线程会降低性能(如果您已使用所有核心接近 100%,则添加更多线程只会增加上下文切换的开销)。

关于java - 随着线程数量的增加,Levenshtein 距离的并行实现速度会变慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10358337/

相关文章:

java - 如何在 Eclipse 中备份我的 java 项目?

java - 尝试编译和启动一个开源 android 应用程序,出现以下错误 : Error inflating class com. android.deskclock.widget.RtlViewPager

java - 如何在jsf中的bean中使用线程

java - 如何启动、暂停和恢复我的线程? (通过从类扩展线程)

r - R 中使用 brm 的负二项式回归在使用多核时会导致错误

python - 在多 CPU 环境中并行化大量功能

c++ - tbb::parallel_for 将 ‘const value_type’ 作为 ‘this’ 参数传递会丢弃限定符

java - 在 Java 11 中使用 mockito 时未找到序列化程序

java - 如何使用 Java 中的多线程实现 2D 方阵乘法?

java - System.setOut 与 PipedOutputStream