r - 正确地将 for 循环转换为并行循环

标签 r loops parallel-processing

我在这里有这个数据集(例如,学生在几年内多次参加考试,要么通过,要么失败 - 我有兴趣研究上一次测试对下一次测试的影响):

   id = sample.int(10000, 100000, replace = TRUE)
res = c(1,0)
results = sample(res, 100000, replace = TRUE)
date_exam_taken = sample(seq(as.Date('1999/01/01'), as.Date('2020/01/01'), by="day"), 100000, replace = TRUE)


my_data = data.frame(id, results, date_exam_taken)
my_data <- my_data[order(my_data$id, my_data$date_exam_taken),]

my_data$general_id = 1:nrow(my_data)
my_data$exam_number = ave(my_data$general_id, my_data$id, FUN = seq_along)
my_data$general_id = NULL

     id results date_exam_taken exam_number
7992   1       1      2004-04-23           1
24837  1       0      2004-12-10           2
12331  1       1      2007-01-19           3
34396  1       0      2007-02-21           4
85250  1       0      2007-09-26           5
11254  1       1      2009-12-20           6

我编写了这个标准的 FOR LOOP,一切似乎都工作正常:

my_list = list()

for (i in 1:length(unique(my_data$id)))
    
{ 
    {tryCatch({
        
        start_i = my_data[my_data$id == i,]
        
        pairs_i =  data.frame(first = head(start_i$results, -1), second = tail(start_i$results, -1))
        frame_i =  as.data.frame(table(pairs_i))
        frame_i$id = i
        print(frame_i)
        my_list[[i]] = frame_i
    }, error = function(e){})
    }}


 final_a = do.call(rbind.data.frame, my_list)

现在,我尝试使用 R 中的“doParallel”库来“优化”此循环。

使用这篇文章( How to transform a "for loop" in a "foreach" loop in R? )作为教程,我尝试按如下方式转换我的循环:

# does this mean I should set makeCluster() to makeCluster(8)???
 > detectCores()
[1] 8

my_list = list()
max = length(unique(my_data$id))

library(doParallel)
registerDoParallel(cl <- makeCluster(3))

# note: for some reason, this loop isn't printing?

test = foreach(i = 1:max, .combine = "rbind") %dopar% {

    {tryCatch({
        
        start_i = my_data[my_data$id == i,]
        
        pairs_i =  data.frame(first = head(start_i$results, -1), second = tail(start_i$results, -1))
        frame_i =  as.data.frame(table(pairs_i))
        frame_i$id = i
        print(frame_i)
        my_list[[i]] = frame_i
    }, error = function(e){})
    }}


 final_b = do.call(rbind.data.frame, test)

基于此 - 我有以下问题:

  • 我是否按照预期正确使用了“doParallel”功能?

  • 还有更好的方法吗?

  • 注意:我希望在包含大约 1000 万个唯一 ID 的数据集上运行此代码

最佳答案

这是一种将并行代码编写为函数的方法。
我事先按 id 拆分数据,而不是将每个 id 与当前索引 i 进行比较。这可以节省一些时间。它还节省了仅提取一次结果向量的时间。

我不知道为什么,我在并行代码中没有发现任何错误,但是最终的data.frame不等于顺序输出final_a,它有更多行

这取决于系统,但正如您在计时中看到的,6 核运行速度最快。

library(parallel)
library(doParallel)
#> Loading required package: foreach
#> Loading required package: iterators

parFun <- function(my_data, ncores) {
  split_data <- split(my_data, my_data$id)
  
  registerDoParallel(cl <- makeCluster(ncores))
  on.exit(stopCluster(cl))
  
  test <- foreach(i = seq_along(split_data)) %dopar% {
    start_i_results <- split_data[[i]]$results
    n <- length(start_i_results)
    if(n > 1L) {
      tryCatch({
        pairs_i <- data.frame(first = start_i_results[-n], 
                              second = start_i_results[-1L])
        frame_i <- as.data.frame(table(pairs_i))
        frame_i$id <- i
        frame_i
      }, error = function(e) {e})
    } else NULL
  }
  final_b <- do.call(rbind.data.frame, test)
  final_b
}

set.seed(2022)
id <- sample.int(10000, 100000, replace = TRUE)
res <- c(1,0)
results <- sample(res, 100000, replace = TRUE)
date_exam_taken <- sample(seq(as.Date('1999/01/01'), as.Date('2020/01/01'), by="day"), 100000, replace = TRUE)
my_data <- data.frame(id, results, date_exam_taken)

my_data <- my_data[order(my_data$id, my_data$date_exam_taken),]

my_data$general_id = 1:nrow(my_data)
my_data$exam_number = ave(my_data$general_id, my_data$id, FUN = seq_along)
my_data$general_id = NULL

t0 <- system.time({
  my_list = list()
  
  for (i in 1:length(unique(my_data$id)))
    
  { 
    {tryCatch({
      
      start_i = my_data[my_data$id == i,]
      
      pairs_i =  data.frame(first = head(start_i$results, -1), second = tail(start_i$results, -1))
      frame_i =  as.data.frame(table(pairs_i))
      frame_i$id = i
      # print(frame_i)
      my_list[[i]] = frame_i
    }, error = function(e){})
    }}
  final_a = do.call(rbind.data.frame, my_list)
})

ncores <- detectCores()

# run with 3 cores
t3 <- system.time(parFun(my_data, 3L))
# run with 6 cores and save the result in `res6`
t6 <- system.time(res6 <- parFun(my_data, ncores - 2L))
rbind(t0, t3, t6)[,1:3]
#>    user.self sys.self elapsed
#> t0     12.86     1.00   15.37
#> t3      3.50     0.22    8.37
#> t6      3.61     0.46    7.65

head(final_a, 10)
#>    first second Freq id
#> 1      0      0    2  1
#> 2      1      0    3  1
#> 3      0      1    4  1
#> 4      1      1    0  1
#> 5      0      0    5  2
#> 6      1      0    3  2
#> 7      0      1    2  2
#> 8      1      1    0  2
#> 9      0      0    0  3
#> 10     1      0    1  3
head(res6, 10)
#>    first second Freq id
#> 1      0      0    2  1
#> 2      1      0    3  1
#> 3      0      1    4  1
#> 4      1      1    0  1
#> 5      0      0    5  2
#> 6      1      0    3  2
#> 7      0      1    2  2
#> 8      1      1    0  2
#> 9      0      0    0  3
#> 10     1      0    1  3

str(final_a)
#> 'data.frame':    38945 obs. of  4 variables:
#>  $ first : Factor w/ 2 levels "0","1": 1 2 1 2 1 2 1 2 1 2 ...
#>  $ second: Factor w/ 2 levels "0","1": 1 1 2 2 1 1 2 2 1 1 ...
#>  $ Freq  : int  2 3 4 0 5 3 2 0 0 1 ...
#>  $ id    : int  1 1 1 1 2 2 2 2 3 3 ...
str(res6)
#> 'data.frame':    38949 obs. of  4 variables:
#>  $ first : Factor w/ 2 levels "0","1": 1 2 1 2 1 2 1 2 1 2 ...
#>  $ second: Factor w/ 2 levels "0","1": 1 1 2 2 1 1 2 2 1 1 ...
#>  $ Freq  : int  2 3 4 0 5 3 2 0 0 1 ...
#>  $ id    : int  1 1 1 1 2 2 2 2 3 3 ...

创建于 2022 年 12 月 11 日 reprex v2.0.2


编辑

以下版本似乎更快。

parFun2 <- function(my_data, ncores) {
  registerDoParallel(cl <- makeCluster(ncores))
  on.exit(stopCluster(cl))
  
  results_list <- split(my_data$results, my_data$id)
  
  test <- foreach(i = seq_along(results_list)) %dopar% {
    start_i_results <- results_list[[i]]
    n <- length(start_i_results)
    if(n > 1L) {
      tbl <- table(first = start_i_results[-n], 
                   second = start_i_results[-1L])
      frame_i <- as.data.frame(tbl)
      frame_i$id <- i
      frame_i
    } else NULL
  }
  data.table::rbindlist(test)
}

关于r - 正确地将 for 循环转换为并行循环,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/74759033/

相关文章:

R - 提高采样功能速度

css - R Shiny : Reduce bottom padding shiny dashboard box

parallel-processing - 如何为处理一组文件的并行作业设置 Kubernetes

haskell - 并行 haskell 。对生产者进行速率限制

java - Java Parallel Stream 与 ExecutorService 的性能对比

r - 从 R 中的天数得出日期?

R cumunique 喜欢 cumsum

python - 为什么字符串不变成整数?

loops - 在 FnMut 向量中迭代并调用闭包

java - Java 中的嵌套 while 循环