java - 同时处理大文本文件

标签 java multithreading java-8

所以我有一个很大的文本文件,在本例中大约有 4.5 GB,我需要尽快处理整个文件。现在我已经使用 3 个线程(不包括主线程)对其进行了多线程处理。输入线程读取输入文件,处理线程处理数据,输出线程将处理后的数据输出到文件。

目前瓶颈在处理环节。因此,我想添加更多的处理线程。但是,这会造成我有多个线程访问同一个 BlockingQueue 的情况,因此它们的结果不会保持输入文件的顺序。

我正在寻找的功能示例如下所示: 输入文件:1、2、3、4、5 输出文件:^相同。不是 2、1、4、3、5 或任何其他组合。

我已经编写了一个虚拟程序,它在功能上与实际程序相同,但不包括处理部分(由于处理类包含 secret 信息,我无法向您提供实际程序)。我还应该提到,所有类(输入、处理和输出)都是包含在主类中的所有内部类,主类包含 initialise() 方法和下面列出的主线程代码中提到的类级变量。

主线程:

static volatile boolean readerFinished = false; // class level variables
static volatile boolean writerFinished = false;

private void initialise() throws IOException {
    BlockingQueue<String> inputQueue = new LinkedBlockingQueue<>(1_000_000);
    BlockingQueue<String> outputQueue = new LinkedBlockingQueue<>(1_000_000); // capacity 1 million. 

    String inputFileName = "test.txt";
    String outputFileName = "outputTest.txt";

    BufferedReader reader = new BufferedReader(new FileReader(inputFileName));
    BufferedWriter writer = new BufferedWriter(new FileWriter(outputFileName));


    Thread T1 = new Thread(new Input(reader, inputQueue));
    Thread T2 = new Thread(new Processing(inputQueue, outputQueue));
    Thread T3 = new Thread(new Output(writer, outputQueue));

    T1.start();
    T2.start();
    T3.start();

    while (!writerFinished) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    reader.close();
    writer.close();

    System.out.println("Exited.");
}

输入线程:(请原谅注释掉的调试代码,使用它来确保阅读器线程实际执行正确)。

class Input implements Runnable {
    BufferedReader reader;
    BlockingQueue<String> inputQueue;

    Input(BufferedReader reader, BlockingQueue<String> inputQueue) {
        this.reader = reader;
        this.inputQueue = inputQueue;
    }

    @Override
    public void run() {
        String poisonPill = "ChH92PU2KYkZUBR";
        String line;
        //int linesRead = 0;

        try {
            while ((line = reader.readLine()) != null) {
                inputQueue.put(line);
                //linesRead++;

                /*
                if (linesRead == 500_000) {
                    //batchesRead += 1;
                    //System.out.println("Batch read");
                    linesRead = 0;
                }
                */
            }

            inputQueue.put(poisonPill);
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }

        readerFinished = true;

    }
}

处理线程:(通常这实际上是对线路做一些事情,但出于模型的目的,我只是让它立即推送到输出线程)。如有必要,我们可以通过让线程为每一行 hibernate 一小段时间来模拟它做一些工作。

class Processing implements Runnable {
    BlockingQueue<String> inputQueue;
    BlockingQueue<String> outputQueue;

    Processing(BlockingQueue<String> inputQueue, BlockingQueue<String> outputQueue) {
        this.inputQueue = inputQueue;
        this.outputQueue = outputQueue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                if (inputQueue.isEmpty() && readerFinished) {
                    break;
                }

                String line = inputQueue.take();
                outputQueue.put(line);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

输出线程:

class Output implements Runnable {
    BufferedWriter writer;
    BlockingQueue<String> outputQueue;

    Output(BufferedWriter writer, BlockingQueue<String> outputQueue) {
        this.writer = writer;
        this.outputQueue = outputQueue;
    }

    @Override
    public void run() {
        String line;
        ArrayList<String> outputList = new ArrayList<>();

        while (true) {
            try {
                line = outputQueue.take();

                if (line.equals("ChH92PU2KYkZUBR")) {
                    for (String outputLine : outputList) {
                        writer.write(outputLine);
                    }
                    System.out.println("Writer finished - executing termination");

                    writerFinished = true;
                    break;
                }

                line += "\n";
                outputList.add(line);

                if (outputList.size() == 500_000) {
                    for (String outputLine : outputList) {
                        writer.write(outputLine);
                    }
                    System.out.println("Writer wrote batch");
                    outputList = new ArrayList<>();
                }
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

所以现在一般的数据流是非常线性的,看起来像这样:

输入 > 处理 > 输出。

但是我想要的是这样的:

Data flow diagram

但要注意的是,当数据输出时,它要么需要按正确的顺序排序,要么已经按照正确的顺序排序。

非常感谢有关如何进行此操作的建议或示例。

过去,我曾使用 Future 和 Callable 接口(interface)来解决涉及此类并行数据流的任务,但不幸的是,该代码不是从单个队列中读取的,因此在这里的帮助微乎其微。

我还应该补充一点,对于那些注意到这一点的人来说,batchSize 和 poisonPill 通常在主线程中定义,然后通过变量传递,它们不是通常是硬编码的在输入线程的代码中,输出检查编写器线程。在凌晨 1 点左右编写实验模型时,我有点懒惰。

编辑:我还应该提一下,这最多需要使用 Java 8。 Java 9 及更高版本的功能无法使用,因为这些版本未安装在将运行此程序的环境中。

最佳答案

你可以做什么:

  • 取X个线程进行处理,其中X是可用于处理的核心数
  • 为每个线程提供自己的输入队列。
  • 读取器线程以可预测的方式将记录提供给每个线程的输入队列循环。
  • 由于输出文件对内存来说太大了,你写了 X 个输出文件,每个线程一个,每个文件名中都有线程的索引,这样你就可以从文件名中重建原来的顺序。
  • 该过程完成后,您合并 X 输出文件。一行来自线程 1 的文件,一行来自线程 2 的文件,以此类推。这重构了原始订单。

作为一个额外的好处,因为每个线程都有一个输入队列,所以读者之间的队列不会发生锁争用。 (仅在读者和作者之间)您甚至可以通过将事物以大于 1 的批处理放入输入队列来优化这一点。

关于java - 同时处理大文本文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56519058/

相关文章:

java - 使用java字符串indexOf

java - 构建快照时不会触发 Jenkins 多分支项目

java - 从 Android App 获取 NTP-Server 的时间

c++ - 不会为所有 thread_local 对象调用析构函数

c++ - 在静态库中存储线程 ID 的正确方法

java-8 - 使用 java 8 流 api 进行嵌套查找

java - 将具有 Runnable 的处理程序升级为 lambda 表达式

java - 使用 Gradle/Java8 构建时存储某些类的方法参数名称

java - 如何创建将返回扩展一个父类(super class)的任何类的列表的方法

c# - .Net:后台 worker 和多 CPU