r - doParallel(包)foreach 不适用于 R 中的大迭代

标签 r parallel-processing parallel-foreach doparallel

我正在分别具有 4 个和 8 个物理核心和逻辑核心的 PC(Linux 操作系统)上运行以下代码(摘自 doParallel's Vignettes )。

使用 iter=1e+6 或更少运行代码,一切都很好,我可以从 CPU 使用情况看到所有核心都用于此计算。然而,当迭代次数较多时(例如iter=4e+6),并行计算似乎在这种情况下不起作用。当我还监控 CPU 使用率时,只有一个核心参与计算(100% 使用率)。

示例1

require("doParallel")
require("foreach")
registerDoParallel(cores=8)
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
iter=4e+6
ptime <- system.time({
    r <- foreach(i=1:iter, .combine=rbind) %dopar% {
        ind <- sample(100, 100, replace=TRUE)
        result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
        coefficients(result1)
    }
})[3]

您知道可能是什么原因吗?内存可能是原因吗?

我用谷歌搜索了一下,发现THIS与我的问题相关,但重点是我没有收到任何类型的错误,并且OP似乎通过在 foreach 循环内提供必要的包来提出解决方案。但可以看出,我的循环内没有使用任何包。

更新1

我的问题还没有解决。根据我的实验,我不认为内存可能是原因。我的系统有 8GB 内存,我在该系统上运行以下简单并行(在所有 8 个逻辑核心上)迭代:

示例2

require("doParallel")
require("foreach")

registerDoParallel(cores=8)
iter=4e+6
ptime <- system.time({
    r <- foreach(i=1:iter, .combine=rbind) %dopar% {
        i
    }
})[3]

运行此代码没有问题,但当我监控 CPU 使用率时,只有一个核心(8 个核心)为 100%。

更新2

至于Example2,@SteveWeston(感谢您指出这一点)指出(在评论中):“您更新中的示例正面临着微小任务的困扰。只有大师才有真正的工作要做的事情,包括发送任务和处理结果。这与原始示例的问题有根本的不同,原始示例在较少的迭代次数上使用了多个核心。”

但是,示例1仍然没有解决。当我运行它并使用 htop 监视进程时,以下是更详细的情况:

让我们将所有 8 个创建的进程命名为 p1p8p1 的状态(htop 中的 S 列)为 R,表示它正在运行且保持不变。但是,对于 p2p8,几分钟后,状态会更改为 D(即不间断 sleep ),几分钟后,状态再次更改为 D更改为 Z (即终止但未被其父级收割)。您知道为什么会发生这种情况吗?

最佳答案

我认为您的内存不足。这是该示例的修改版本,当您有很多任务时,它应该可以更好地工作。它使用 doSNOW 而不是 doParallel,因为 doSNOW 允许您在工作人员返回结果时使用组合函数处理结果。此示例将这些结果写入文件以便使用更少的内存,但它最后使用“.final”函数将结果读回内存,但如果内存不足,您可以跳过该操作。

library(doSNOW)
library(tcltk)
nw <- 4  # number of workers
cl <- makeSOCKcluster(nw)
registerDoSNOW(cl)

x <- iris[which(iris[,5] != 'setosa'), c(1,5)]
niter <- 15e+6
chunksize <- 4000  # may require tuning for your machine
maxcomb <- nw + 1  # this count includes fobj argument
totaltasks <- ceiling(niter / chunksize)

comb <- function(fobj, ...) {
  for(r in list(...))
    writeBin(r, fobj)
  fobj
}

final <- function(fobj) {
  close(fobj)
  t(matrix(readBin('temp.bin', what='double', n=niter*2), nrow=2))
}

mkprogress <- function(total) {
  pb <- tkProgressBar(max=total,
                      label=sprintf('total tasks: %d', total))
  function(n, tag) {
    setTkProgressBar(pb, n,
      label=sprintf('last completed task: %d of %d', tag, total))
  }
}
opts <- list(progress=mkprogress(totaltasks))
resultFile <- file('temp.bin', open='wb')

r <-
  foreach(n=idiv(niter, chunkSize=chunksize), .combine='comb',
          .maxcombine=maxcomb, .init=resultFile, .final=final,
          .inorder=FALSE, .options.snow=opts) %dopar% {
    do.call('c', lapply(seq_len(n), function(i) {
      ind <- sample(100, 100, replace=TRUE)
      result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
      coefficients(result1)
    }))
  }

我添加了一个进度条,因为这个示例需要几个小时才能执行。

请注意,此示例还使用了 iterators 包中的 idiv 函数来增加每个任务的工作量。这种技术称为分块,通常可以提高并行性能。但是,使用 idiv 会弄乱任务索引,因为变量 i 现在是每个任务索引而不是全局索引。对于全局索引,您可以编写一个包装 idiv 的自定义迭代器:

idivix <- function(n, chunkSize) {
  i <- 1
  it <- idiv(n, chunkSize=chunkSize)
  nextEl <- function() {
    m <- nextElem(it)  # may throw 'StopIterator'
    value <- list(i=i, m=m)
    i <<- i + m
    value
  }
  obj <- list(nextElem=nextEl)
  class(obj) <- c('abstractiter', 'iter')
  obj
}

此迭代器发出的值是列表,每个列表包含一个起始索引和一个计数。这是一个使用此自定义迭代器的简单 foreach 循环:

r <- 
  foreach(a=idivix(10, chunkSize=3), .combine='c') %dopar% {
    do.call('c', lapply(seq(a$i, length.out=a$m), function(i) {
      i
    }))
  }

当然,如果任务的计算强度足够大,您可能不需要分块,并且可以像原始示例一样使用简单的 foreach 循环。

关于r - doParallel(包)foreach 不适用于 R 中的大迭代,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37750937/

相关文章:

r - 如何在 foreach 中以编程方式在 %do% 和 %dopar% 之间切换?

r - 在 R foreach() 下并行运行时无法识别动态库依赖项

parallel-processing - 使用 GNU parallel 并行化具有各种参数的脚本

R并行: rbind parallely into separate data.帧

r - 调整通过ggplot()和facet_grid绘制的面板的大小

r - 从 R 代码调用中断

vb.net - Parallel.ForEach 每次给出不同的结果

haskell - Haskell 中的并行图形处理

r - 如何在 ifelse 语句中更新我的sample()?

R AND 运算符到新列