r - 在 R 中流式处理大型 csv 文件

标签 r csv chunks

我需要对一个非常大的 csv 文件 (c.8.5GB) 进行一些相对简单的更改。我最初尝试使用各种读取器函数:read.csv、readr::read.csv、data.table::fread。但是:它们都耗尽了内存。

我想我需要改用流处理方法;读取一个块,更新它,写它,重复。我找到了 this answer这是在正确的行上;但是我不知道如何终止循环(我对 R 比较陌生)。

所以我有两个问题:

  • 使 while 循环工作的正确方法是什么?
  • 有没有更好的方法(对于“更好”的某些定义)?例如有没有办法使用 dplyr 和管道来做到这一点?

  • 当前代码如下:
    src_fname <- "testdata/model_input.csv"
    tgt_fname <- "testdata/model_output.csv"
    
    #Changes needed in file: rebase identifiers, set another col to constant value
    rebase_data <- function(data, offset) {
      data$'Unique Member ID' <- data$'Unique Member ID' - offset
      data$'Client Name' <- "TestClient2"
      return(data)
    }
    
    CHUNK_SIZE <- 1000
    src_conn = file(src_fname, "r")
    data <- read.csv(src_conn, nrows = CHUNK_SIZE, check.names=FALSE)
    cols <- colnames(data)
    offset <- data$'Unique Member ID'[1] - 1
    
    data <- rebase_data(data, offset)
    #1st time through, write the headers
    tgt_conn = file(tgt_fname, "w")
    write.csv(data,tgt_conn, row.names=FALSE)
    
    #loop over remaining data
    end = FALSE
    while(end == FALSE) {
      data <- read.csv(src_conn, nrows = CHUNK_SIZE, check.names=FALSE, col.names = cols)
      data <- rebase_data(data, offset)
      #write.csv doesn't support col.names=FALSE; so use write.table which does
      write.table(data, tgt_conn, row.names=FALSE, col.names=FALSE, sep=",")
      # ??? How to test for EOF and set end = TRUE if so  ???
      # This doesn't work, presumably because nrow() != CHUNK_SIZE on final loop?
      if (nrow(data) < CHUNK_SIZE) {
        end <- TRUE
      }
    
    }
    close(src_conn)
    close(tgt_conn)
    

    感谢您的指点。

    最佳答案

    很抱歉戳了一个 2 年前的帖子,但现在是 readr::read_csv_chunked (在加载 dplyr 时与 tidyverse 一起自动加载),我们也可以这样做:

    require(tidyverse)
    
    ## For non-exploratory code, as @antoine-sac suggested, use:
    # require(readr)  # for function `read_csv_chunked` and `read_csv`
    # require(dplyr)  # for the pipe `%>%` thus less parentheses
    
    src_fname = "testdata/model_input.csv"
    tgt_fname = "testdata/model_output.csv"
    
    CHUNK_SIZE = 1000
    
    offset = read_csv(src_fname, n_max=1)$comm_code %>% as.numeric() - 1 
    
    rebase.chunk = function(df, pos) {
      df$comm_code = df$comm_code %>% as.numeric() - offset
      df$'Client Name' = "TestClient2"
      is.append = ifelse(pos > 1, T, F)
      df %>% write_csv(
        tgt_fname,
        append=is.append
      )
    }
    
    read_csv_chunked(
      src_fname, 
      callback=SideEffectChunkCallback$new(rebase.chunk), 
      chunk_size = chunck.size,
      progress = T    # optional, show progress bar
    )
    

    这里棘手的部分是设置 is.append基于参数 pos ,表示数据帧的起始行号df原始文件中。内 readr::write_csv , 当 append=F标题(列名)将写入文件,否则不会。

    关于r - 在 R 中流式处理大型 csv 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42492351/

    相关文章:

    逐行排序然后跨数据框的特定列连接

    arrays - 根据数据中的关系对数组进行切片(在 Ruby 中)

    IOS录制音频并实时分割成文件

    r - 避免在 R 中覆盖文本文件

    r - 当 plot() 返回多个图时如何仅获取某些图

    python - 如何在 R/Python 中迭代/循环大型(>2GB)JSON 数据集?

    MYSQL+加载数据INFILE+检查字段

    r - 为具有相同 'tidy' 格式和大小的不断变化的输入 csv 文件有效地创建 data.frames

    ios - 将 CSV 文件保存到文档目录文件夹

    R Markdown block 字体颜色已更改