r - 在 r 中使用 Parallel 和 fread 同时按 block 导入 CSV

标签 r data.table

我知道如何打开连接并使用 read.table 读取数据 block [编辑:fread does not allow connections ],删除一些行并将结果数据按顺序收集在列表中。但是有没有其他方法可以对其进行优化,以便可以在 fread 中读取 block 并同时进行处理?

我使用的是 Windows。

到目前为止,根据我在网上收集的信息,我可以使用 Cygwin -split- 将大型 csv 文件拆分为多个较小的 csv 文件,然后使用 parLapply 读取所有文件。

你们有更好的主意吗?

最佳答案

这里尝试并行化对数据 block 的 fread 调用。该解决方案大量引用了来自

的元素

TryCatch with parLapply (Parallel package) in R

import large number of .txt files to data.frame, include empty .txt files by giving them a row data.frame

require(data.table)
require(dplyr)
require(parallel)

gc()

#=========================================================================
# generating test data
#=========================================================================

set.seed(1)
m   <- matrix(rnorm(1e5),ncol=2)
csv <- data.frame(x=1:1e2,m)
names(csv) <- c(letters[1:3])
head(csv)
write.csv(csv,"test.csv")

#=========================================================================
# defining function to read chunks of data with fread: fread_by_chunks
#=========================================================================

fread_by_chunks <-  function(filepath, counter, ChunkSize, ...) {

    chunk <- as.character({(counter-1)/ChunkSize}+1)   
    print(paste0("Working on chunk ", chunk, "..."))

    DT <- tryCatch(fread(filepath, 
                         skip=counter, 
                         nrows=ChunkSize, 
                         ...), 
                   error=function(e) message(conditionMessage(e)))

    # This condition checks that no errors occured
    if(!class(DT)[1]=="data.table"){ 
      DT <- data.table(cbind(chunk=chunk,is.empty="YES"))
    # Just in case files are still empty even though no error  
    } else if(nrow(DT)==0){ 
      DT <- data.table(cbind(chunk=chunk,is.empty="YES"))
    # Apply filter here using column indexes DT[DT[[ncol]]] as columns are not named, automatic names (Vs) do not work.
    } else {
      DT[,chunk := chunk]
      DT[,is.empty := "NO"]
    }
    return(DT)
  }

#=========================================================================
# testing fread_by_chunks
#=========================================================================

ChunkSize = 1000
n_rows = 60000 # test.csv has 50e3 lines, we want to test if the code breaks with a call to nrows above that. 
## In this step you have to make a guess as to how many rows there are in the dataset you are reading in. Guess a large number to make sure all the lines will be read. When the number of rows in your guess is above the actual number, the code will return a row with the field is.empty == "YES". You just have to delete these rows afterwards. If no such rows are there you cannot be sure you have read all the rows from the csv file. 

counter <- c(0, seq(ChunkSize, n_rows, ChunkSize)) + 1

start_time <- Sys.time()
test <- lapply(counter, function(x) {fread_by_chunks(filepath = "test.csv", counter = x, ChunkSize = ChunkSize, header = F, fill = T, blank.lines.skip=T, select=c(1,2,4))})
Sys.time() - start_time
##Time difference of 0.2528741 secs

# binding chunks
test <- bind_rows(test)

#=========================================================================
# parallelizing fread_by_chunks
#=========================================================================

no_cores <- detectCores() - 1 # 3 cores, 2.8 Ghz
cl <- makeCluster(no_cores)
clusterExport(cl, c("data.table", "ChunkSize", "counter", "fread_by_chunks", "n_rows"))
clusterEvalQ(cl, library(data.table))

start_time <- Sys.time()
test <- parLapply(cl, counter, function(x) {fread_by_chunks(filepath = "test.csv", counter = x, ChunkSize = 1000, header = F, fill = T, blank.lines.skip=T, select=c(1,2,4))})
Sys.time() - start_time
##Time difference of 0.162251 secs
stopCluster(cl)

test <- bind_rows(test)  

# just calling fread without blocks. It obviously takes a lot less time, but we have memory to fit all the data.

start_time <- Sys.time()
test <- fread("test.csv", 
              skip=0, 
              nrows=ChunkSize, 
              header=F, 
              fill = T, 
              blank.lines.skip=T, 
              select=c(1,2,4))
Sys.time() - start_time
#Time difference of 0.006005049 secs

关于r - 在 r 中使用 Parallel 和 fread 同时按 block 导入 CSV,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48675749/

相关文章:

r - 无法在大小为 11.8 Gb 的 R 中分配向量

r - 用短语构建语料库

r - 对两个数据帧中的两列进行条件 setdiff(全部到全部),并具有用于进行匹配的数字范围

r - data.table 中类似转置的过程

r - strsplit() 的行为与字符串开头和结尾的空格不同

r - 整数列和列表列中的整数值之间的差异

r - 使用 `fread` 对#N/A 的错误解释

r - 如何创建具有列子集的平均差异的列?

r data.table 函数式编程/元编程/语言计算

r - 如何在 Travis CI 上为 R 包运行 covr::codecov()