R:使用 "doparallel"、 "foreach"和 "purrr"库重写循环

标签 r loops for-loop dplyr parallel-processing

我在 R 中编写了以下代码,它对一些人工生成的数据执行(循环)一系列数据操作操作(最终输出称为“final_results”):

#load library
    library(dplyr)

library(data.table)

set.seed(123)

# create some data for this example
a1 = rnorm(1000,100,10)
b1 = rnorm(1000,100,5)
c1 = sample.int(1000, 1000, replace = TRUE)
train_data = data.frame(a1,b1,c1)


####
results_table <- data.frame()

for (i in 1:10 ) {
    
    #generate random numbers
    random_1 =  runif(1, 80, 120)
    random_2 =  runif(1, random_1, 120)
    random_3 =  runif(1, 85, 120)
    random_4 =  runif(1, random_3, 120)
    
    #bin data according to random criteria
    train_data <- train_data %>% mutate(cat = ifelse(a1 <= random_1 & b1 <= random_3, "a", ifelse(a1 <= random_2 & b1 <= random_4, "b", "c")))
    
    train_data$cat = as.factor(train_data$cat)
    
    #new splits
    a_table = train_data %>%
        filter(cat == "a") %>%
        select(a1, b1, c1, cat)
    
    b_table = train_data %>%
        filter(cat == "b") %>%
        select(a1, b1, c1, cat)
    
    c_table = train_data %>%
        filter(cat == "c") %>%
        select(a1, b1, c1, cat)
    
    split_1 =  runif(1,0, 1)
    split_2 =  runif(1, 0, 1)
    split_3 =  runif(1, 0, 1)
    
    #calculate 60th quantile ("quant") for each bin
    
    table_a = data.frame(a_table%>% group_by(cat) %>%
                             mutate(quant = quantile(c1, prob = split_1)))
    
    table_b = data.frame(b_table%>% group_by(cat) %>%
                             mutate(quant = quantile(c1, prob = split_2)))
    
    table_c = data.frame(c_table%>% group_by(cat) %>%
                             mutate(quant = quantile(c1, prob = split_3)))
    
    
    
    
    #create a new variable ("diff") that measures if the quantile is bigger tha the value of "c1"
    table_a$diff = ifelse(table_a$quant > table_a$c1,1,0)
    table_b$diff = ifelse(table_b$quant > table_b$c1,1,0)
    table_c$diff = ifelse(table_c$quant > table_c$c1,1,0)
    
    #group all tables
    
    final_table = rbind(table_a, table_b, table_c)
    
    #create a table: for each bin, calculate the average of "diff"
    final_table_2 = data.frame(final_table %>%
                                   group_by(cat) %>%
                                   summarize(
                                       mean = mean(diff)
                                   ))
    
    #add "total mean" to this table
    final_table_2 = data.frame(final_table_2 %>% add_row(cat = "total", mean = mean(final_table$diff)))
    
    #format this table: add the random criteria to this table for reference
    final_table_2$random_1 = random_1
    
    final_table_2$random_2 = random_2
    
    final_table_2$random_3 = random_3
    
    final_table_2$random_4 = random_4
    
    final_table_2$split_1 = split_1
    
    final_table_2$split_2 = split_2
    
    final_table_2$split_3 = split_3
    
    final_table_2$iteration_number = i
    
    
    results_table <- rbind(results_table, final_table_2)
    
    final_results = dcast(setDT(results_table), iteration_number + random_1 + random_2 + random_3 + random_4 + split_1 + split_2 + split_3 ~ cat, value.var = 'mean')
    
} 

上面的循环工作得很好 - 但我正在尝试了解有关 R 的更多信息,并尝试使用其他库中的其他函数(例如 "doParallel"、"foreach")来重写此循环和“purrr”库。

选项 1:

我在 R 中遇到了以下代码,它显示了使用“purrr”库编写循环的通用模板(显然“map_df”是一个使用循环中的代码的函数):

#option 1
library(dplyr)
library(purrr)
library(tictoc)


data_gen <- function(){ #here you insert your data generating process
  tibble(
    x = runif(100),
    y = runif(100)
  )
}

N <- 10000 #number of datasets do be generated


tic('method A')  #not necessary, measures the time of the code between 'tic' and 'toc'
output <- tibble(
  i = 1:N
) %>%
  split(.$i) %>%
  map_df(
    ~data_gen()
  )
toc()

但是,我不确定如何修改此代码以适合我的示例。我首先创建了 map_df 函数:

#create map_df function:

map_df <- function() {
    #bin data according to random criteria
    train_data <- train_data %>% mutate(cat = ifelse(a1 <= random_1 & b1 <= random_3, "a", ifelse(a1 <= random_2 & b1 <= random_4, "b", "c")))

    train_data$cat = as.factor(train_data$cat)

    #new splits
    a_table = train_data %>%
        filter(cat == "a") %>%
        select(a1, b1, c1, cat)

    b_table = train_data %>%
        filter(cat == "b") %>%
        select(a1, b1, c1, cat)

    c_table = train_data %>%
        filter(cat == "c") %>%
        select(a1, b1, c1, cat)

    split_1 =  runif(1,0, 1)
    split_2 =  runif(1, 0, 1)
    split_3 =  runif(1, 0, 1)

    #calculate 60th quantile ("quant") for each bin

    table_a = data.frame(a_table%>% group_by(cat) %>%
                             mutate(quant = quantile(c1, prob = split_1)))

    table_b = data.frame(b_table%>% group_by(cat) %>%
                             mutate(quant = quantile(c1, prob = split_2)))

    table_c = data.frame(c_table%>% group_by(cat) %>%
                             mutate(quant = quantile(c1, prob = split_3)))




    #create a new variable ("diff") that measures if the quantile is bigger tha the value of "c1"
    table_a$diff = ifelse(table_a$quant > table_a$c1,1,0)
    table_b$diff = ifelse(table_b$quant > table_b$c1,1,0)
    table_c$diff = ifelse(table_c$quant > table_c$c1,1,0)

    #group all tables

    final_table = rbind(table_a, table_b, table_c)

    #create a table: for each bin, calculate the average of "diff"
    final_table_2 = data.frame(final_table %>%
                                   group_by(cat) %>%
                                   summarize(
                                       mean = mean(diff)
                                   ))

    #add "total mean" to this table
    final_table_2 = data.frame(final_table_2 %>% add_row(cat = "total", mean = mean(final_table$diff)))

    #format this table: add the random criteria to this table for reference
    final_table_2$random_1 = random_1

    final_table_2$random_2 = random_2

    final_table_2$random_3 = random_3

    final_table_2$random_4 = random_4

    final_table_2$split_1 = split_1

    final_table_2$split_2 = split_2

    final_table_2$split_3 = split_3

    final_table_2$iteration_number = i


    results_table <- rbind(results_table, final_table_2)

    final_results = dcast(setDT(results_table), iteration_number + random_1 + random_2 + random_3 + random_4 + split_1 + split_2 + split_3 ~ cat, value.var = 'mean')
}

但是当我尝试运行通用模板时,它会产生以下错误:

data_gen <- function(){ #here you insert your data generating process
    tibble(
        # create some data for this example
        a1 = rnorm(1000,100,10)
        b1 = rnorm(1000,100,5)
        c1 = sample.int(1000, 1000, replace = TRUE)
        train_data = data.frame(a1,b1,c1)
    )
}

N <- 10000 #number of datasets do be generated


tic('method A')  #not necessary, measures the time of the code between 'tic' and 'toc'
output <- tibble(
    i = 1:N
) %>%
    split(.$i) %>%
    map_df(
        ~data_gen()
    )
toc() 

Error in map_df(., ~data_gen()) : unused arguments (., ~data_gen())

有谁知道为什么会产生这个错误?

选项 2:

我不确定如何在我的示例中使用“doParallel”和“foreach”库。似乎所有带有“doParallel”的示例都要求用户首先定义他们希望计算机使用的“核心”数量:

 library(doParallel)
 cl <- makeCluster(2)
 registerDoParallel(cl)

最后,用户必须指示计算机停止该进程:

stopCluster(cl)

除此之外,我不确定如何使用“doParalell”和“foreach”库来使我的示例受益。

有人可以给我看看这个吗? 谢谢

最佳答案

data_gen 函数中存在一些潜在的拼写错误。

  • tibble 是由未用 , 分隔的列创建的
 tibble(
        # create some data for this example
        a1 = rnorm(1000,100,10) ####
        b1 = rnorm(1000,100,5)####
      ...
  • 最好使用现有函数以外的名称 - 来自 purrrmap_df

如果我们想多次执行该函数,请使用 base R 中的 replicatepurrr::rerun

-函数

data_gen <- function(){ #here you insert your data generating process
      tibble::tibble(
         # create some data for this example
         a1 = rnorm(1000,100,10),
         b1 = rnorm(1000,100,5),
         c1 = sample.int(1000, 1000, replace = TRUE))
     
 }

-使用的包

library(purrr)
library(dplyr)
library(doSNOW)
library(parallel)

-顺序运行

N <- 10 
out <-  N %>% 
   rerun(data_gen()) %>%
    bind_rows(.id = 'grp') 

-并行运行

no_of_cores = detectCores()
 cl <- makeSOCKcluster(no_of_cores)
 registerDoSNOW(cl)


 out2 <- foreach(i = seq_len(N), .combine='rbind', 
         .packages = "tibble",        
           .multicombine=TRUE) %dopar% {
       data_gen()
   }
 stopCluster(cl)

-检查行数

nrow(out)
#[1] 10000
nrow(out2)
#[1] 10000

上面的函数只是为了展示如何顺序和并行运行简单的函数data_gen。借助OP的完整函数,我们可以在外部函数(map_new_fn)内调用data_gen(),并且该函数可以并行或顺序调用

map_new_fn <- function() {


    results_table <- data.frame()
     train_data <- data_gen()     
     train_data <- train_data %>%
         mutate(cat = ifelse(a1 <= random_1 & b1 <= random_3, "a", ifelse(a1 <= random_2 & b1 <= random_4, "b", "c")))

     train_data$cat = as.factor(train_data$cat)
     

     #new splits
     a_table = train_data %>%
         filter(cat == "a") %>%
         select(a1, b1, c1, cat)

     b_table = train_data %>%
         filter(cat == "b") %>%
         select(a1, b1, c1, cat)

     c_table = train_data %>%
         filter(cat == "c") %>%
         select(a1, b1, c1, cat)

     split_1 =  runif(1,0, 1)
     split_2 =  runif(1, 0, 1)
     split_3 =  runif(1, 0, 1)

     #calculate 60th quantile ("quant") for each bin

     table_a = data.frame(a_table%>% group_by(cat) %>%
                              mutate(quant = quantile(c1, prob = split_1)))

     table_b = data.frame(b_table%>% group_by(cat) %>%
                              mutate(quant = quantile(c1, prob = split_2)))

     table_c = data.frame(c_table%>% group_by(cat) %>%
                              mutate(quant = quantile(c1, prob = split_3)))




     #create a new variable ("diff") that measures if the quantile is bigger tha the value of "c1"
     table_a$diff = ifelse(table_a$quant > table_a$c1,1,0)
     table_b$diff = ifelse(table_b$quant > table_b$c1,1,0)
     table_c$diff = ifelse(table_c$quant > table_c$c1,1,0)

     #group all tables

     final_table = rbind(table_a, table_b, table_c)

     #create a table: for each bin, calculate the average of "diff"
     final_table_2 = data.frame(final_table %>%
                                    group_by(cat) %>%
                                    summarize(
                                        mean = mean(diff)
                                    ))

     #add "total mean" to this table
     final_table_2 = data.frame(final_table_2 %>% add_row(cat = "total", mean = mean(final_table$diff)))

     #format this table: add the random criteria to this table for reference
     final_table_2$random_1 = random_1

     final_table_2$random_2 = random_2

     final_table_2$random_3 = random_3

     final_table_2$random_4 = random_4

     final_table_2$split_1 = split_1

     final_table_2$split_2 = split_2

     final_table_2$split_3 = split_3

     final_table_2$iteration_number = i


     results_table <- rbind(results_table, final_table_2)

     final_results = dcast(setDT(results_table), iteration_number + random_1 + random_2 + random_3 + random_4 + split_1 + split_2 + split_3 ~ cat, value.var = 'mean')
     final_results
 }

-按顺序运行

out1_new <- N %>%
       rerun(map_new_fn()) %>%
       bind_rows(.id = 'grp')

-并行运行

no_of_cores = detectCores()
 cl <- makeSOCKcluster(no_of_cores)
 registerDoSNOW(cl)


 out2_new <- foreach(i = seq_len(N), .combine='rbind', 
         .export = c("data_gen", "map_new_fn"),
         .packages = c("tibble", "dplyr", "data.table"),        
           .multicombine=TRUE) %dopar% {
       map_new_fn()
   }
 stopCluster(cl)

-检查输出行数

nrow(out1_new)
[1] 10
nrow(out2_new)
[1] 10

关于R:使用 "doparallel"、 "foreach"和 "purrr"库重写循环,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68262546/

相关文章:

r - 带有分类变量的 CVlm : factor has new levels

r - 计算 R 中每一列的出现次数

r - 将参数从命令行传递到 R markdown 文档

r - 按行比较矩阵与向量中的元素

jquery - 我怎样才能让 jQuery 动画在完成后无限循环?

python - 替换列表中的所有值而不使用枚举?

loops - 是否应该在阻止来自 channel 的for-select循环时将请求对象传递给goroutine?

c++ - 如何计算数组中出现的次数?

python - 为什么 python 中的 'for' 循环会更改未引用的列表?

java - 如何使用ArrayList存储多行信息?