所以我有一个很大的文本文件,在本例中大约有 4.5 GB,我需要尽快处理整个文件。现在我已经使用 3 个线程(不包括主线程)对其进行了多线程处理。输入线程读取输入文件,处理线程处理数据,输出线程将处理后的数据输出到文件。
目前瓶颈在处理环节。因此,我想添加更多的处理线程。但是,这会造成我有多个线程访问同一个 BlockingQueue 的情况,因此它们的结果不会保持输入文件的顺序。
我正在寻找的功能示例如下所示: 输入文件:1、2、3、4、5 输出文件:^相同。不是 2、1、4、3、5 或任何其他组合。
我已经编写了一个虚拟程序,它在功能上与实际程序相同,但不包括处理部分(由于处理类包含 secret 信息,我无法向您提供实际程序)。我还应该提到,所有类(输入、处理和输出)都是包含在主类中的所有内部类,主类包含 initialise() 方法和下面列出的主线程代码中提到的类级变量。
主线程:
static volatile boolean readerFinished = false; // class level variables
static volatile boolean writerFinished = false;
private void initialise() throws IOException {
BlockingQueue<String> inputQueue = new LinkedBlockingQueue<>(1_000_000);
BlockingQueue<String> outputQueue = new LinkedBlockingQueue<>(1_000_000); // capacity 1 million.
String inputFileName = "test.txt";
String outputFileName = "outputTest.txt";
BufferedReader reader = new BufferedReader(new FileReader(inputFileName));
BufferedWriter writer = new BufferedWriter(new FileWriter(outputFileName));
Thread T1 = new Thread(new Input(reader, inputQueue));
Thread T2 = new Thread(new Processing(inputQueue, outputQueue));
Thread T3 = new Thread(new Output(writer, outputQueue));
T1.start();
T2.start();
T3.start();
while (!writerFinished) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
reader.close();
writer.close();
System.out.println("Exited.");
}
输入线程:(请原谅注释掉的调试代码,使用它来确保阅读器线程实际执行正确)。
class Input implements Runnable {
BufferedReader reader;
BlockingQueue<String> inputQueue;
Input(BufferedReader reader, BlockingQueue<String> inputQueue) {
this.reader = reader;
this.inputQueue = inputQueue;
}
@Override
public void run() {
String poisonPill = "ChH92PU2KYkZUBR";
String line;
//int linesRead = 0;
try {
while ((line = reader.readLine()) != null) {
inputQueue.put(line);
//linesRead++;
/*
if (linesRead == 500_000) {
//batchesRead += 1;
//System.out.println("Batch read");
linesRead = 0;
}
*/
}
inputQueue.put(poisonPill);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
readerFinished = true;
}
}
处理线程:(通常这实际上是对线路做一些事情,但出于模型的目的,我只是让它立即推送到输出线程)。如有必要,我们可以通过让线程为每一行 hibernate 一小段时间来模拟它做一些工作。
class Processing implements Runnable {
BlockingQueue<String> inputQueue;
BlockingQueue<String> outputQueue;
Processing(BlockingQueue<String> inputQueue, BlockingQueue<String> outputQueue) {
this.inputQueue = inputQueue;
this.outputQueue = outputQueue;
}
@Override
public void run() {
while (true) {
try {
if (inputQueue.isEmpty() && readerFinished) {
break;
}
String line = inputQueue.take();
outputQueue.put(line);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
输出线程:
class Output implements Runnable {
BufferedWriter writer;
BlockingQueue<String> outputQueue;
Output(BufferedWriter writer, BlockingQueue<String> outputQueue) {
this.writer = writer;
this.outputQueue = outputQueue;
}
@Override
public void run() {
String line;
ArrayList<String> outputList = new ArrayList<>();
while (true) {
try {
line = outputQueue.take();
if (line.equals("ChH92PU2KYkZUBR")) {
for (String outputLine : outputList) {
writer.write(outputLine);
}
System.out.println("Writer finished - executing termination");
writerFinished = true;
break;
}
line += "\n";
outputList.add(line);
if (outputList.size() == 500_000) {
for (String outputLine : outputList) {
writer.write(outputLine);
}
System.out.println("Writer wrote batch");
outputList = new ArrayList<>();
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
}
所以现在一般的数据流是非常线性的,看起来像这样:
输入 > 处理 > 输出。
但是我想要的是这样的:
但要注意的是,当数据输出时,它要么需要按正确的顺序排序,要么已经按照正确的顺序排序。
非常感谢有关如何进行此操作的建议或示例。
过去,我曾使用 Future 和 Callable 接口(interface)来解决涉及此类并行数据流的任务,但不幸的是,该代码不是从单个队列中读取的,因此在这里的帮助微乎其微。
我还应该补充一点,对于那些注意到这一点的人来说,batchSize 和 poisonPill 通常在主线程中定义,然后通过变量传递,它们不是通常是硬编码的在输入线程的代码中,输出检查编写器线程。在凌晨 1 点左右编写实验模型时,我有点懒惰。
编辑:我还应该提一下,这最多需要使用 Java 8。 Java 9 及更高版本的功能无法使用,因为这些版本未安装在将运行此程序的环境中。
最佳答案
你可以做什么:
- 取X个线程进行处理,其中X是可用于处理的核心数
- 为每个线程提供自己的输入队列。
- 读取器线程以可预测的方式将记录提供给每个线程的输入队列循环。
- 由于输出文件对内存来说太大了,你写了 X 个输出文件,每个线程一个,每个文件名中都有线程的索引,这样你就可以从文件名中重建原来的顺序。
- 该过程完成后,您合并 X 输出文件。一行来自线程 1 的文件,一行来自线程 2 的文件,以此类推。这重构了原始订单。
作为一个额外的好处,因为每个线程都有一个输入队列,所以读者之间的队列不会发生锁争用。 (仅在读者和作者之间)您甚至可以通过将事物以大于 1 的批处理放入输入队列来优化这一点。
关于java - 同时处理大文本文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56519058/