在 dmapply(ddR 包)中运行聚合函数

标签 r dataframe parallel-processing aggregate distributed-computing

我想运行 aggregate dmapply 内的函数通过 ddR 提供的功能包裹。

预期结果

所需的结果反射(reflect)了通过 aggregate 生成的简单输出在基地:

aggregate(
  x = mtcars$mpg,
  FUN = function(x) {
    mean(x, na.rm = TRUE)
  },
  by = list(trans = mtcars$am)
)

它产生:
  trans        x
1     0 17.14737
2     1 24.39231

尝试 - ddmapply

我想在使用 ddmapply 时达到相同的结果,如下尝试:
# ddR
require(ddR)

# ddR object creation
distMtcars <- as.dframe(mtcars)

# Aggregate / ddmapply
dmapply(
  FUN = function(x, y) {
    aggregate(FUN = mean(x, na.rm = TRUE),
              x = x,
              by = list(trans = y))
  },
  distMtcars$mpg,
  y = distMtcars$am,
  output.type = "dframe",
  combine = "rbind"
)

代码失败:

Error in match.fun(FUN) : 'mean(x, na.rm = TRUE)' is not a function, character or symbol Called from: match.fun(FUN)



更新

修复 @Mike 指出的错误消除错误,但是不会产生预期的结果。编码:
# Avoid namespace conflict with other packages
ddR::collect(
  dmapply(
    FUN = function(x, y) {
      aggregate(
        FUN = function(x) {
          mean(x, na.rm = TRUE)
        },
        x = x,
        by = list(trans = y)
      )
    },
    distMtcars$mpg,
    y = distMtcars$am,
    output.type = "dframe",
    combine = "rbind"
  )
)

产量:
[1] trans x    
<0 rows> (or 0-length row.names)

最佳答案

如果您将聚合函数更改为与您之前调用的函数一致,它对我来说很好用:FUN = function(x) mean(x, na.rm = T) .找不到原因mean(x, na.rm = T)是因为它不是一个函数(它是一个函数调用),而是 mean是一个函数。

它也会给你 NA结果除非你改变你的 x = distMtcars$mpgx = collect(distMtcars)$mpg . y 也一样。综上所述,我认为这应该适合您:

res <-dmapply(
  FUN = function(x, y) {
    aggregate(FUN = function(x) mean(x, na.rm = TRUE),
              x = x,
              by = list(trans = y))
  },
  x = list(collect(distMtcars)$mpg),
  y = list(collect(distMtcars)$am),
  output.type = "dframe",
  combine = "rbind"
)

那么你可以做collect(res)看看结果。
collect(res)
#  trans        x
#1     0 17.14737
#2     1 24.39231

关于在 dmapply(ddR 包)中运行聚合函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43870520/

相关文章:

r - 如何在 R 中以不同颜色在一个图上绘制多个 ECDF

python - Pandas - 从行数中获取汇总数据框

r - 从 R 中的时间数据中提取间隔

python - Pandas:使用自定义函数重新索引

java - 使用java的stream api进行前馈计算

python - 在 Python 中并行处理一组 XML 文件

r - r中的命令if(0)是什么意思?

r - 使用 R 在 Excel 工作表中清理数据

r - 当向量是 R 中列表的一部分时,如何删除字符向量中的字符串?

R 在 HPC MPIcluster 上运行 foreach dopar 循环