我在这里有这个数据集(例如,学生在几年内多次参加考试,要么通过,要么失败 - 我有兴趣研究上一次测试对下一次测试的影响):
id = sample.int(10000, 100000, replace = TRUE)
res = c(1,0)
results = sample(res, 100000, replace = TRUE)
date_exam_taken = sample(seq(as.Date('1999/01/01'), as.Date('2020/01/01'), by="day"), 100000, replace = TRUE)
my_data = data.frame(id, results, date_exam_taken)
my_data <- my_data[order(my_data$id, my_data$date_exam_taken),]
my_data$general_id = 1:nrow(my_data)
my_data$exam_number = ave(my_data$general_id, my_data$id, FUN = seq_along)
my_data$general_id = NULL
id results date_exam_taken exam_number
7992 1 1 2004-04-23 1
24837 1 0 2004-12-10 2
12331 1 1 2007-01-19 3
34396 1 0 2007-02-21 4
85250 1 0 2007-09-26 5
11254 1 1 2009-12-20 6
我编写了这个标准的 FOR LOOP,一切似乎都工作正常:
my_list = list()
for (i in 1:length(unique(my_data$id)))
{
{tryCatch({
start_i = my_data[my_data$id == i,]
pairs_i = data.frame(first = head(start_i$results, -1), second = tail(start_i$results, -1))
frame_i = as.data.frame(table(pairs_i))
frame_i$id = i
print(frame_i)
my_list[[i]] = frame_i
}, error = function(e){})
}}
final_a = do.call(rbind.data.frame, my_list)
现在,我尝试使用 R 中的“doParallel”库来“优化”此循环。
使用这篇文章( How to transform a "for loop" in a "foreach" loop in R? )作为教程,我尝试按如下方式转换我的循环:
# does this mean I should set makeCluster() to makeCluster(8)???
> detectCores()
[1] 8
my_list = list()
max = length(unique(my_data$id))
library(doParallel)
registerDoParallel(cl <- makeCluster(3))
# note: for some reason, this loop isn't printing?
test = foreach(i = 1:max, .combine = "rbind") %dopar% {
{tryCatch({
start_i = my_data[my_data$id == i,]
pairs_i = data.frame(first = head(start_i$results, -1), second = tail(start_i$results, -1))
frame_i = as.data.frame(table(pairs_i))
frame_i$id = i
print(frame_i)
my_list[[i]] = frame_i
}, error = function(e){})
}}
final_b = do.call(rbind.data.frame, test)
基于此 - 我有以下问题:
我是否按照预期正确使用了“doParallel”功能?
还有更好的方法吗?
注意:我希望在包含大约 1000 万个唯一 ID 的数据集上运行此代码
最佳答案
这是一种将并行代码编写为函数的方法。
我事先按 id 拆分数据,而不是将每个 id 与当前索引 i
进行比较。这可以节省一些时间。它还节省了仅提取一次结果
向量的时间。
我不知道为什么,我在并行代码中没有发现任何错误,但是最终的data.frame不等于顺序输出final_a
,它有更多行。
这取决于系统,但正如您在计时中看到的,6 核运行速度最快。
library(parallel)
library(doParallel)
#> Loading required package: foreach
#> Loading required package: iterators
parFun <- function(my_data, ncores) {
split_data <- split(my_data, my_data$id)
registerDoParallel(cl <- makeCluster(ncores))
on.exit(stopCluster(cl))
test <- foreach(i = seq_along(split_data)) %dopar% {
start_i_results <- split_data[[i]]$results
n <- length(start_i_results)
if(n > 1L) {
tryCatch({
pairs_i <- data.frame(first = start_i_results[-n],
second = start_i_results[-1L])
frame_i <- as.data.frame(table(pairs_i))
frame_i$id <- i
frame_i
}, error = function(e) {e})
} else NULL
}
final_b <- do.call(rbind.data.frame, test)
final_b
}
set.seed(2022)
id <- sample.int(10000, 100000, replace = TRUE)
res <- c(1,0)
results <- sample(res, 100000, replace = TRUE)
date_exam_taken <- sample(seq(as.Date('1999/01/01'), as.Date('2020/01/01'), by="day"), 100000, replace = TRUE)
my_data <- data.frame(id, results, date_exam_taken)
my_data <- my_data[order(my_data$id, my_data$date_exam_taken),]
my_data$general_id = 1:nrow(my_data)
my_data$exam_number = ave(my_data$general_id, my_data$id, FUN = seq_along)
my_data$general_id = NULL
t0 <- system.time({
my_list = list()
for (i in 1:length(unique(my_data$id)))
{
{tryCatch({
start_i = my_data[my_data$id == i,]
pairs_i = data.frame(first = head(start_i$results, -1), second = tail(start_i$results, -1))
frame_i = as.data.frame(table(pairs_i))
frame_i$id = i
# print(frame_i)
my_list[[i]] = frame_i
}, error = function(e){})
}}
final_a = do.call(rbind.data.frame, my_list)
})
ncores <- detectCores()
# run with 3 cores
t3 <- system.time(parFun(my_data, 3L))
# run with 6 cores and save the result in `res6`
t6 <- system.time(res6 <- parFun(my_data, ncores - 2L))
rbind(t0, t3, t6)[,1:3]
#> user.self sys.self elapsed
#> t0 12.86 1.00 15.37
#> t3 3.50 0.22 8.37
#> t6 3.61 0.46 7.65
head(final_a, 10)
#> first second Freq id
#> 1 0 0 2 1
#> 2 1 0 3 1
#> 3 0 1 4 1
#> 4 1 1 0 1
#> 5 0 0 5 2
#> 6 1 0 3 2
#> 7 0 1 2 2
#> 8 1 1 0 2
#> 9 0 0 0 3
#> 10 1 0 1 3
head(res6, 10)
#> first second Freq id
#> 1 0 0 2 1
#> 2 1 0 3 1
#> 3 0 1 4 1
#> 4 1 1 0 1
#> 5 0 0 5 2
#> 6 1 0 3 2
#> 7 0 1 2 2
#> 8 1 1 0 2
#> 9 0 0 0 3
#> 10 1 0 1 3
str(final_a)
#> 'data.frame': 38945 obs. of 4 variables:
#> $ first : Factor w/ 2 levels "0","1": 1 2 1 2 1 2 1 2 1 2 ...
#> $ second: Factor w/ 2 levels "0","1": 1 1 2 2 1 1 2 2 1 1 ...
#> $ Freq : int 2 3 4 0 5 3 2 0 0 1 ...
#> $ id : int 1 1 1 1 2 2 2 2 3 3 ...
str(res6)
#> 'data.frame': 38949 obs. of 4 variables:
#> $ first : Factor w/ 2 levels "0","1": 1 2 1 2 1 2 1 2 1 2 ...
#> $ second: Factor w/ 2 levels "0","1": 1 1 2 2 1 1 2 2 1 1 ...
#> $ Freq : int 2 3 4 0 5 3 2 0 0 1 ...
#> $ id : int 1 1 1 1 2 2 2 2 3 3 ...
创建于 2022 年 12 月 11 日 reprex v2.0.2
编辑
以下版本似乎更快。
parFun2 <- function(my_data, ncores) {
registerDoParallel(cl <- makeCluster(ncores))
on.exit(stopCluster(cl))
results_list <- split(my_data$results, my_data$id)
test <- foreach(i = seq_along(results_list)) %dopar% {
start_i_results <- results_list[[i]]
n <- length(start_i_results)
if(n > 1L) {
tbl <- table(first = start_i_results[-n],
second = start_i_results[-1L])
frame_i <- as.data.frame(tbl)
frame_i$id <- i
frame_i
} else NULL
}
data.table::rbindlist(test)
}
关于r - 正确地将 for 循环转换为并行循环,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/74759033/