java - 从HDFS传输文件与将文件复制到本地磁盘

标签 java hadoop hdfs heap-memory

在我的Java应用程序中,我正在使用一个文本文件(大小约为300 MB),该文件保存在HDFS中。文件的每一行都包含一个字符串和一个以逗号分隔的整数ID。我正在逐行读取文件并从中创建Hashmaps(String,ID)。

该文件如下所示:

String1,Integer1
String2,Integer2
...

现在,我目前正在使用Apacha Hadoop配置和FileSystem Object直接从HDFS读取文件。
Configuration conf = new Configuration();
conf.addResource("core-site.xml"));
conf.addResource("hdfs-site.xml"));
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());

path= "<some location in HDFS>"
FileSystem fs = FileSystem.get(URI.create(path), conf);
in = fs.open(new Path(path));

输入流“in”被传递给另一个称为 read(InputStream in)的函数,用于读取文件。
  public void init(InputStream is) throws Exception {
    ConcurrentMap<String, String> pageToId = new ConcurrentHashMap();
    ConcurrentMap<String, String> idToPage = new ConcurrentHashMap();
    logger.info("Free memory: " + Runtime.getRuntime().freeMemory());
    InputStreamReader stream = new InputStreamReader(is, StandardCharsets.UTF_8);
    BufferedReader reader = new BufferedReader(stream);
    List<String> pageIdMappingColumns = ServerProperties.getInstance().getIdMappingColumns();
    String line;
    int line_no=0;

    while (true) {
        try {
            line = reader.readLine();

            if (line == null) {
                break;
            }
            line_no++;
            //System.out.println("Free memory: " + Runtime.getRuntime().freeMemory());
            String[] values = line.split(COMMA);
            //System.out.println("Free memory: " + Runtime.getRuntime().freeMemory());
            if (values.length < pageIdMappingColumns.size()) {
                throw new RuntimeException(PAGEMAPPER_INVALID_MAPPING_FILE_FORMAT);
            }

            String id = EMPTY_STR;
            String page = EMPTY_STR;
            for (int i = 0; i < values.length; i++) {
                String s = values[i].trim();
                if (PAGEID.equals(pageIdMappingColumns.get(i))) {
                    id = s;
                    continue;
                }
                if (PAGENAME.equals(pageIdMappingColumns.get(i))) {
                    page = s;
                }
            }
            pageToId.put(page, id);
            idToPage.put(id, page);
        } catch (Exception e) {
            logger.error(PAGEMAPPER_INIT + e.toString() + " on line " + line_no);

        }
    }
    logger.info("Free memory: " + Runtime.getRuntime().freeMemory());
    logger.info("Total number of lines: " + line_no);
    reader.close();
    ConcurrentMap<String, String> oldPageToId = pageToIdRef.get();
    ConcurrentMap<String, String> oldIdToPage = idToPageRef.get();
    idToPage.put(MINUS_1, START);
    idToPage.put(MINUS_2, EXIT);
    pageToId.put(START, MINUS_1);
    pageToId.put(EXIT, MINUS_2);

    /* Update the Atomic reference hashmaps in memory in two conditions
    1. If there was no map in memory(first iteration)
    2. If the number of page-names and page-id pairs in the mappings.txt file are more than the previous iteration
    */

    if (oldPageToId == null || oldIdToPage != null && oldIdToPage.size() <= idToPage.size() && oldPageToId.size() <= pageToId.size()) {
        idToPageRef.set(idToPage);
        pageToIdRef.set(pageToId);
        logger.info(PAGEMAPPER_INIT + " " + PAGEMAPPER_UPDATE_MAPPING);
    } else {
        logger.info(PAGEMAPPER_INIT + " " + PAGEMAPPER_LOG_MSZ);
    }
}

当工作完成时,我将关闭流:
IOUtils.closeQuietly(is);

自从该文件在HDFS期间被更改以来,我每1小时执行一次以上代码。现在,我得到了java.lang.OutOfMemoryError:Java堆空间。

我的问题是:就内存需求而言,将文件复制到磁盘然后使用它比直接从HDFS直接访问它更好吗?

注意:文件有> 3200000行。

最佳答案

流始终是选择的方式。

之所以收到OutOfMemory,是因为您从不关闭流,因此会导致内存泄漏。

手动关闭流或使用try-with-resource

编辑

pageToId.put(page, id);
idToPage.put(id, page);

您将文件大小至少存储了两倍。大约是600MB。

之后,您将该值分配给一些ref变量:
idToPageRef.set(idToPage);
pageToIdRef.set(pageToId);

我猜您仍然在某处引用旧的ref数据,因此不会发布内部 map 数据。

您也有资源泄漏
throw new RuntimeException(PAGEMAPPER_INVALID_MAPPING_FILE_FORMAT);

您应该使用try-with-resource或在finally块中手动关闭流。

关于java - 从HDFS传输文件与将文件复制到本地磁盘,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50355951/

相关文章:

java - 关闭 Spring MVC 数据绑定(bind)器

Hadoop - HDFS - 查看文件如何拆分的命令

hadoop - hdfs 数据已损坏。无法删除损坏的文件夹,因为它显示没有这样的文件或目录

testing - 如何从终端运行hadoop hdfs测试用例

hadoop - 使用 HBASE 的 Spark 与使用 HDFS 的 Spark

java - 从 HDFS 加载 key 表

java - 使用 Google Maps Android 标记聚类实用程序时无法实例化 appComponentFactory

java - JPA 2.0 Provider Hibernate 3.6 for DB2 v9.5 type 2 驱动程序在配置准备中抛出异常

java - 从 SQLServer 到 Java 的通知

java - 使用JAVA将字节流传输到HDFS