java - Java中的多线程文件读取

标签 java multithreading nio

我读取了目录中的一组 json 文件来检查 name属性并填充 HashMap<Name,List<File_path>> 。有没有一种方法可以使用线程使这个过程更快,并且它们会并行执行,还是仍然一次读取一个文件?会导致填充HashMap的同步问题吗?当前代码如下

Map<String, List<String>> map = new HashMap<String, List<String>>();
    JsonParser parser = new JsonParser();
    File[] files = new File(dir).listFiles();
    for (File tfile : files) {
        Object obj = parser.parse(new FileReader(tfile.getAbsolutePath()));
        JsonObject jsonObject = (JsonObject) obj;
        JsonArray array = (JsonArray) jsonObject.get("array");
        String name = array.get(0).getAsJsonObject().get("name").toString();
        if (map.containsKey(name)) {
            List<String> paths = map.get(name);
            paths.add(tfile.getAbsolutePath());
            map.put(name, paths);
        } else {
            List<String> paths = new LinkedList<String>();
            paths.add(tfile.getAbsolutePath());
            map.put(name, paths);
        }
    }

最佳答案

我在一台装有 Windows 7 和 java 1.7 的简单 PC 上做了一个与您类似的实验(读取分布在多个线程上的 3000 个小文件)。

file read with buffering and multithread comparation

正如您所看到的,性能得到了很大的提高(高达 69%),而且只需要 5 个线程。

我还在比较中包含了缓冲区大小作为参数,但正如您所看到的,它的影响并不显着(这是因为我的文件每个都是 1-8 Kb)。

我想到的改进程序的最后一件事是将 HashMap 的大小预先调整为最大最终大小的 70%。

编辑

这是我的代码:

package demo;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.regex.Pattern;

import com.sun.japex.TestCase;

public abstract class AbstractDirectoryFilesReadDriver extends com.sun.japex.JapexDriverBase
{
    private final int bufferSize;

    protected AbstractDirectoryFilesReadDriver(int bufferSize)
    {
        super();
        this.bufferSize=bufferSize;
    }

    @Override
    public void run(TestCase testCase)
    {
        int numberOfThreads=testCase.getIntParam("number_threads");
        final String mask=testCase.getParam("filename_mask");
        File dir=new File(testCase.getParam("dir"));
        long totalSize=testCase.getLongParam("total_size");
        File[] list=dir.listFiles(new FilenameFilter()
        {
            @Override
            public boolean accept(File dir, String name)
            {
                return Pattern.matches(mask, name);
            }
        });

        // Split the list of files between a number of threads.
        Collection<Collection<File>> collections=splitDirList(list, numberOfThreads);

        // Start the threads and let every one read its subset of files.
        Collection<Thread> threads=new ArrayList<Thread>(numberOfThreads);
        Collection<MyRunnable> runnables=new ArrayList<MyRunnable>(numberOfThreads);
        for (Collection<File> collection : collections)
        {
            MyRunnable runnable=new MyRunnable(collection);
            runnables.add(runnable);
            Thread thread=new Thread(runnable);
            threads.add(thread);
            thread.start();
        }
        try
        {
            for (Thread thread : threads)
            {
                thread.join();
            }

            // Check the read size: Ensure that all the files have been fully read:
            long size=0;
            for (MyRunnable runnable : runnables)
            {
                size+=runnable.getSize();
            }

            System.out.println("numberOfThreads=" + numberOfThreads + ", size=" + size);
            if (size != totalSize)
            {
                throw new RuntimeException("Size check failed: expected size=" + totalSize + ", read size=" + size);
            }
        }
        catch (InterruptedException e)
        {
            throw new Error(e);
        }
    }

    private Collection<Collection<File>> splitDirList(File[] list, int numberOfParts)
    {
        int n=0;
        Collection<Collection<File>> collection=new ArrayList<Collection<File>>(numberOfParts);
        int load=(int)Math.ceil(list.length / (double)numberOfParts);
        for (int i=0; i < numberOfParts; i++)
        {
            Collection<File> part=new ArrayList<File>(load);
            for (int j=0; j < load && n < list.length; j++)
            {
                part.add(list[n++]);
            }
            collection.add(part);
        }
        return collection;
    }

    private long readFiles(Collection<File> files)
        throws FileNotFoundException,
        IOException
    {
        long size=0;
        for (File file : files)
        {
            size+=readInputStream(createInputStream(file));
        }
        return size;
    }

    private InputStream createInputStream(File file)
        throws FileNotFoundException
    {
        InputStream input=new FileInputStream(file);
        if (this.bufferSize > 0)
        {
            input=new BufferedInputStream(input, this.bufferSize);
        }
        return input;
    }

    /**
     * Reads fully a inputStream.
     * 
     * @param input InputStream.
     * @return Content as String.
     * @exception java.io.IOException If an error occured while reading.
     */
    private static long readInputStream(java.io.InputStream input)
        throws java.io.IOException
    {
        long size=0;
        byte[] buffer=new byte[4096];
        int n;
        do
        {
            n=input.read(buffer);
            if (n > 0)
            {
                size+=n;
            }
        }
        while (n >= 0);
        return size;
    }

    private class MyRunnable implements Runnable
    {
        private final Collection<File> files;

        public MyRunnable(Collection<File> files)
        {
            super();
            this.files=files;
        }

        private long size;

        public long getSize()
        {
            return this.size;
        }

        @Override
        public void run()
        {
            try
            {
                this.size=readFiles(this.files);
            }
            catch (IOException e)
            {
                throw new Error(e);
            }
        }
    }
}

public class UnbufferedDirectoryFilesReadDriver extends AbstractDirectoryFilesReadDriver
{
    public UnbufferedDirectoryFilesReadDriver()
    {
        super(0);
    }
}

public class Buffered4096DirectoryFilesReadDriver extends AbstractDirectoryFilesReadDriver
{
    public Buffered4096DirectoryFilesReadDriver()
    {
        super(4096);
    }
}

public class Buffered8192DirectoryFilesReadDriver extends AbstractDirectoryFilesReadDriver
{
    public Buffered8192DirectoryFilesReadDriver()
    {
        super(8192);
    }
}

japex 配置文件:

<testSuite name="DirectoryFilesReadDriver" xmlns="http://www.sun.com/japex/testSuite">
    <param name="japex.classPath" value="target/test-classes" />
    <param name="japex.chartType" value="barchart"/>
    <param name="japex.warmupIterations" value="1"/>
    <param name="japex.runIterations" value="1"/>
    <param name="japex.runsPerDriver" value="1"/>
    <param name="number_threads" value="1"/>
    <param name="japex.resultUnit" value="ms"/>

    <param name="dir" value="c:\myfiles" />
    <param name="filename_mask" value=".*.txt" />
    <!-- You must specify here the total expected size of all the files -->
    <param name="total_size" value="..." />

    <driver name="demo.UnbufferedDirectoryFilesReadDriver">
        <param name="japex.DriverClass" value="demo.UnbufferedDirectoryFilesReadDriver" />
    </driver>
    <driver name="demo.Buffered4096DirectoryFilesReadDriver">
        <param name="japex.DriverClass" value="demo.Buffered4096DirectoryFilesReadDriver" />
    </driver>
    <driver name="demo.Buffered8192DirectoryFilesReadDriver">
        <param name="japex.DriverClass" value="demo.Buffered8192DirectoryFilesReadDriver" />
    </driver>

    <testCase name="threads-01">
        <param name="number_threads" value="1" />
    </testCase>
    <testCase name="threads-02">
        <param name="number_threads" value="2" />
    </testCase>
    <testCase name="threads-05">
        <param name="number_threads" value="5" />
    </testCase>
    <testCase name="threads-10">
        <param name="number_threads" value="10" />
    </testCase>
    <testCase name="threads-20">
        <param name="number_threads" value="20" />
    </testCase>
</testSuite>

...还有 POM:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>dev</groupId>
    <artifactId>demo-readfiles-multithread</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo-readfiles-multithread</name>
    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.8.2</version>
        </dependency>
        <dependency>
            <groupId>com.sun.japex</groupId>
            <artifactId>japex</artifactId>
            <version>1.2.3</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <compilerVersion>1.7</compilerVersion>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>com.sun.japex</groupId>
                <artifactId>japex-maven-plugin</artifactId>
                <version>1.2.3</version>
                <executions>
                    <execution>
                        <id>japex</id>
                        <goals>
                            <goal>japex</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <reportDirectory>${project.build.directory}/japex-reports</reportDirectory>
                    <html>true</html>
                    <japexConfigFiles>
                        <japexConfigFile>${basedir}/scripts/DirectoryFilesReadDriver.japex.xml</japexConfigFile>
                    </japexConfigFiles>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

关于java - Java中的多线程文件读取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33369828/

相关文章:

java - Java中如何检测List是否包含自身

java - if(a&&b) 是否应该比 if(a) if(b) 花费更多时间?

java - 如何使用 JUnit 对缓存控制 header 进行单元测试?

java - neo4j 更新 1000 万个节点上的属性

python - 在python上并行执行和文件写入

python - 通过线程并行化缓慢的 api 调用

asynchronous - flink的sink只支持bio吗?

java - 如何实现具有最大高度和内部滚动的多行 TextView

java - 异步关闭套接字 channel

java - 首次连接后 Netty 服务器不接受连接