我正在分别具有 4 个和 8 个物理核心和逻辑核心的 PC(Linux 操作系统)上运行以下代码(摘自 doParallel's Vignettes )。
使用 iter=1e+6
或更少运行代码,一切都很好,我可以从 CPU 使用情况看到所有核心都用于此计算。然而,当迭代次数较多时(例如iter=4e+6),并行计算似乎在这种情况下不起作用。当我还监控 CPU 使用率时,只有一个核心参与计算(100% 使用率)。
示例1
require("doParallel")
require("foreach")
registerDoParallel(cores=8)
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
iter=4e+6
ptime <- system.time({
r <- foreach(i=1:iter, .combine=rbind) %dopar% {
ind <- sample(100, 100, replace=TRUE)
result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
coefficients(result1)
}
})[3]
您知道可能是什么原因吗?内存可能是原因吗?
我用谷歌搜索了一下,发现THIS与我的问题相关,但重点是我没有收到任何类型的错误,并且OP似乎通过在 foreach 循环内提供必要的包来提出解决方案。但可以看出,我的循环内没有使用任何包。
更新1
我的问题还没有解决。根据我的实验,我不认为内存可能是原因。我的系统有 8GB 内存,我在该系统上运行以下简单并行(在所有 8 个逻辑核心上)迭代:
示例2
require("doParallel")
require("foreach")
registerDoParallel(cores=8)
iter=4e+6
ptime <- system.time({
r <- foreach(i=1:iter, .combine=rbind) %dopar% {
i
}
})[3]
运行此代码没有问题,但当我监控 CPU 使用率时,只有一个核心(8 个核心)为 100%。
更新2
至于Example2,@SteveWeston(感谢您指出这一点)指出(在评论中):“您更新中的示例正面临着微小任务的困扰。只有大师才有真正的工作要做的事情,包括发送任务和处理结果。这与原始示例的问题有根本的不同,原始示例在较少的迭代次数上使用了多个核心。”
但是,示例1仍然没有解决。当我运行它并使用 htop
监视进程时,以下是更详细的情况:
让我们将所有 8 个创建的进程命名为 p1
到 p8
。 p1
的状态(htop
中的 S
列)为 R
,表示它正在运行且保持不变。但是,对于 p2
到 p8
,几分钟后,状态会更改为 D
(即不间断 sleep ),几分钟后,状态再次更改为 D
更改为 Z
(即终止但未被其父级收割)。您知道为什么会发生这种情况吗?
最佳答案
我认为您的内存不足。这是该示例的修改版本,当您有很多任务时,它应该可以更好地工作。它使用 doSNOW 而不是 doParallel,因为 doSNOW 允许您在工作人员返回结果时使用组合函数处理结果。此示例将这些结果写入文件以便使用更少的内存,但它最后使用“.final”函数将结果读回内存,但如果内存不足,您可以跳过该操作。
library(doSNOW)
library(tcltk)
nw <- 4 # number of workers
cl <- makeSOCKcluster(nw)
registerDoSNOW(cl)
x <- iris[which(iris[,5] != 'setosa'), c(1,5)]
niter <- 15e+6
chunksize <- 4000 # may require tuning for your machine
maxcomb <- nw + 1 # this count includes fobj argument
totaltasks <- ceiling(niter / chunksize)
comb <- function(fobj, ...) {
for(r in list(...))
writeBin(r, fobj)
fobj
}
final <- function(fobj) {
close(fobj)
t(matrix(readBin('temp.bin', what='double', n=niter*2), nrow=2))
}
mkprogress <- function(total) {
pb <- tkProgressBar(max=total,
label=sprintf('total tasks: %d', total))
function(n, tag) {
setTkProgressBar(pb, n,
label=sprintf('last completed task: %d of %d', tag, total))
}
}
opts <- list(progress=mkprogress(totaltasks))
resultFile <- file('temp.bin', open='wb')
r <-
foreach(n=idiv(niter, chunkSize=chunksize), .combine='comb',
.maxcombine=maxcomb, .init=resultFile, .final=final,
.inorder=FALSE, .options.snow=opts) %dopar% {
do.call('c', lapply(seq_len(n), function(i) {
ind <- sample(100, 100, replace=TRUE)
result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
coefficients(result1)
}))
}
我添加了一个进度条,因为这个示例需要几个小时才能执行。
请注意,此示例还使用了 iterators
包中的 idiv
函数来增加每个任务的工作量。这种技术称为分块,通常可以提高并行性能。但是,使用 idiv 会弄乱任务索引,因为变量 i 现在是每个任务索引而不是全局索引。对于全局索引,您可以编写一个包装 idiv
的自定义迭代器:
idivix <- function(n, chunkSize) {
i <- 1
it <- idiv(n, chunkSize=chunkSize)
nextEl <- function() {
m <- nextElem(it) # may throw 'StopIterator'
value <- list(i=i, m=m)
i <<- i + m
value
}
obj <- list(nextElem=nextEl)
class(obj) <- c('abstractiter', 'iter')
obj
}
此迭代器发出的值是列表,每个列表包含一个起始索引和一个计数。这是一个使用此自定义迭代器的简单 foreach 循环:
r <-
foreach(a=idivix(10, chunkSize=3), .combine='c') %dopar% {
do.call('c', lapply(seq(a$i, length.out=a$m), function(i) {
i
}))
}
当然,如果任务的计算强度足够大,您可能不需要分块,并且可以像原始示例一样使用简单的 foreach 循环。
关于r - doParallel(包)foreach 不适用于 R 中的大迭代,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37750937/