我需要对一个非常大的 csv 文件 (c.8.5GB) 进行一些相对简单的更改。我最初尝试使用各种读取器函数:read.csv、readr::read.csv、data.table::fread。但是:它们都耗尽了内存。
我想我需要改用流处理方法;读取一个块,更新它,写它,重复。我找到了 this answer这是在正确的行上;但是我不知道如何终止循环(我对 R 比较陌生)。
所以我有两个问题:
当前代码如下:
src_fname <- "testdata/model_input.csv"
tgt_fname <- "testdata/model_output.csv"
#Changes needed in file: rebase identifiers, set another col to constant value
rebase_data <- function(data, offset) {
data$'Unique Member ID' <- data$'Unique Member ID' - offset
data$'Client Name' <- "TestClient2"
return(data)
}
CHUNK_SIZE <- 1000
src_conn = file(src_fname, "r")
data <- read.csv(src_conn, nrows = CHUNK_SIZE, check.names=FALSE)
cols <- colnames(data)
offset <- data$'Unique Member ID'[1] - 1
data <- rebase_data(data, offset)
#1st time through, write the headers
tgt_conn = file(tgt_fname, "w")
write.csv(data,tgt_conn, row.names=FALSE)
#loop over remaining data
end = FALSE
while(end == FALSE) {
data <- read.csv(src_conn, nrows = CHUNK_SIZE, check.names=FALSE, col.names = cols)
data <- rebase_data(data, offset)
#write.csv doesn't support col.names=FALSE; so use write.table which does
write.table(data, tgt_conn, row.names=FALSE, col.names=FALSE, sep=",")
# ??? How to test for EOF and set end = TRUE if so ???
# This doesn't work, presumably because nrow() != CHUNK_SIZE on final loop?
if (nrow(data) < CHUNK_SIZE) {
end <- TRUE
}
}
close(src_conn)
close(tgt_conn)
感谢您的指点。
最佳答案
很抱歉戳了一个 2 年前的帖子,但现在是 readr::read_csv_chunked
(在加载 dplyr
时与 tidyverse
一起自动加载),我们也可以这样做:
require(tidyverse)
## For non-exploratory code, as @antoine-sac suggested, use:
# require(readr) # for function `read_csv_chunked` and `read_csv`
# require(dplyr) # for the pipe `%>%` thus less parentheses
src_fname = "testdata/model_input.csv"
tgt_fname = "testdata/model_output.csv"
CHUNK_SIZE = 1000
offset = read_csv(src_fname, n_max=1)$comm_code %>% as.numeric() - 1
rebase.chunk = function(df, pos) {
df$comm_code = df$comm_code %>% as.numeric() - offset
df$'Client Name' = "TestClient2"
is.append = ifelse(pos > 1, T, F)
df %>% write_csv(
tgt_fname,
append=is.append
)
}
read_csv_chunked(
src_fname,
callback=SideEffectChunkCallback$new(rebase.chunk),
chunk_size = chunck.size,
progress = T # optional, show progress bar
)
这里棘手的部分是设置
is.append
基于参数 pos
,表示数据帧的起始行号df
原始文件中。内 readr::write_csv
, 当 append=F
标题(列名)将写入文件,否则不会。
关于r - 在 R 中流式处理大型 csv 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42492351/