java - 当高吞吐量(3GB/s)文件系统可用时,如何在 Java 中使用多线程读取文件

标签 java multithreading file-io io

据我了解,对于普通的主轴驱动系统,使用多个线程读取文件效率很低。

这是一个不同的情况,我有一个可用的高吞吐量文件系统,它提供高达 3GB/s 的读取速度,具有 196 个 CPU 内核和 2TB RAM

单线程 Java 程序读取文件的最大速度为 85-100 MB/s,因此我有可能比单线程更好。我必须读取 1TB 大小的文件,并且我有足够的 RAM 来加载它。

目前我使用以下或类似的东西,但需要编写多线程的东西以获得更好的吞吐量:

Java 7 文件:50 MB/秒

List<String> lines = Files.readAllLines(Paths.get(path), encoding);

Java commons-io:48 MB/s

List<String> lines = FileUtils.readLines(new File("/path/to/file.txt"), "utf-8");

与 Guava 相同:45 MB/s

List<String> lines = Files.readLines(new File("/path/to/file.txt"), Charset.forName("utf-8"));

Java 扫描器类:非常慢

Scanner s = new Scanner(new File("filepath"));
ArrayList<String> list = new ArrayList<String>();
while (s.hasNext()){
    list.add(s.next());
}
s.close();

我希望能够尽快加载文件并以正确的排序顺序构建相同的 ArrayList。

another question读起来很相似,但实际上是不同的,因为: 问题是讨论多线程文件 I/O 在物理上不可能高效的系统,但由于技术进步,我们现在拥有旨在支持高吞吐量 I/O 的系统,因此限制因素是 CPU/SW ,可以通过多线程 I/O 来克服。

另一个问题没有回答如何编写多线程I/O代码。

最佳答案

这是使用多个线程读取单个文件的解决方案。

将文件分成N个 block ,在一个线程中读取每个 block ,然后按顺序合并它们。当心跨越 block 边界的线。这是用户建议的基本思想 slaks

对单个 20 GB 文件的多线程实现进行以下基准测试:

1 个线程:50 秒:400 MB/s

2 个线程:30 秒:666 MB/s

4 线程:20 秒:1GB/s

8 线程:60 秒:333 MB/s

等效 Java7 readAllLines() :400 秒:50 MB/s

注意:这可能仅适用于支持高吞吐量 I/O 的系统,而不适用于普通个人计算机

package filereadtests;

import java.io.*;
import static java.lang.Math.toIntExact;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FileRead implements Runnable
{

private FileChannel _channel;
private long _startLocation;
private int _size;
int _sequence_number;

public FileRead(long loc, int size, FileChannel chnl, int sequence)
{
    _startLocation = loc;
    _size = size;
    _channel = chnl;
    _sequence_number = sequence;
}

@Override
public void run()
{
    try
    {
        System.out.println("Reading the channel: " + _startLocation + ":" + _size);

        //allocate memory
        ByteBuffer buff = ByteBuffer.allocate(_size);

        //Read file chunk to RAM
        _channel.read(buff, _startLocation);

        //chunk to String
        String string_chunk = new String(buff.array(), Charset.forName("UTF-8"));

        System.out.println("Done Reading the channel: " + _startLocation + ":" + _size);

    } catch (Exception e)
    {
        e.printStackTrace();
    }
}

//args[0] is path to read file
//args[1] is the size of thread pool; Need to try different values to fing sweet spot
public static void main(String[] args) throws Exception
{
    FileInputStream fileInputStream = new FileInputStream(args[0]);
    FileChannel channel = fileInputStream.getChannel();
    long remaining_size = channel.size(); //get the total number of bytes in the file
    long chunk_size = remaining_size / Integer.parseInt(args[1]); //file_size/threads

    //Max allocation size allowed is ~2GB
    if (chunk_size > (Integer.MAX_VALUE - 5))
    {
        chunk_size = (Integer.MAX_VALUE - 5);
    }

    //thread pool
    ExecutorService executor = Executors.newFixedThreadPool(Integer.parseInt(args[1]));

    long start_loc = 0;//file pointer
    int i = 0; //loop counter
    while (remaining_size >= chunk_size)
    {
        //launches a new thread
        executor.execute(new FileRead(start_loc, toIntExact(chunk_size), channel, i));
        remaining_size = remaining_size - chunk_size;
        start_loc = start_loc + chunk_size;
        i++;
    }

    //load the last remaining piece
    executor.execute(new FileRead(start_loc, toIntExact(remaining_size), channel, i));

    //Tear Down
    executor.shutdown();

    //Wait for all threads to finish
    while (!executor.isTerminated())
    {
        //wait for infinity time
    }
    System.out.println("Finished all threads");
    fileInputStream.close();
}

}

关于java - 当高吞吐量(3GB/s)文件系统可用时,如何在 Java 中使用多线程读取文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40412008/

相关文章:

java - 使用 Firebase 数据库无法进行分页

java - 使用相同方法的多个 fragment

php - 打开文件夹而没有PHP警告

java - 在 MongoDB 中创建测试数据

java - 使用字符串值作为计算java的 float

linux - Linux 线程中的文件段/节/记录锁

C++11 线程,它是有效代码吗?

java - 没有 Activity 的 runOnUiThread

java - 如何在ArrayList中存储二进制文件数据

java - 使用Java 7 nio读写文件