r - 如何在 R 中将 `foreach` 和 `%dopar%` 与 `R6` 类一起使用?

标签 r parallel-processing doparallel parallel-foreach r6

我在尝试使用 %dopar% 时遇到问题和 foreach()连同 R6类(class)。四处搜索,我只能找到两个与此相关的资源,一个未回答的 SO question和一个开放的GitHub issueR6 上存储库。

在一条评论(即 GitHub 问题)中,建议通过重新分配 parent_env 来解决问题。类的 SomeClass$parent_env <- environment() .我想了解 environment() 到底是做什么的引用何时在 SomeClass$parent_env <- environment() 中调用此表达式(即 %dopar%)的 foreach

这是一个最小的可重现示例:

Work <- R6::R6Class("Work",

    public = list(
        values = NULL,


        initialize = function() {
            self$values <- "some values"
        }
    )
)

现在,以下 Task类使用 Work构造函数中的类。

Task <- R6::R6Class("Task",
    private = list(
        ..work = NULL
    ),


    public = list(
        initialize = function(time) {
            private$..work <- Work$new()
            Sys.sleep(time)
        }
    ),


    active = list(
        work = function() {
            return(private$..work)
        }
    )
)

Factory类,Task类被创建并且foreach..m.thread() 中实现.

Factory<- R6::R6Class("Factory",

    private = list(
        ..warehouse = list(),
        ..amount = NULL,
        ..parallel = NULL,


        ..m.thread = function(object, ...) {
            cluster <- parallel::makeCluster(parallel::detectCores() -  1)
            doParallel::registerDoParallel(cluster)

            private$..warehouse <- foreach::foreach(1:private$..amount, .export = c("Work")) %dopar% {
                # What exactly does `environment()` encapsulate in this context?
                object$parent_env <- environment()
                object$new(...) 
            }

            parallel::stopCluster(cluster)
        },


        ..s.thread = function(object, ...) {
            for (i in 1:private$..amount) {
               private$..warehouse[[i]] <- object$new(...)
            }
        },


        ..run = function(object, ...) {
            if(private$..parallel) {
                private$..m.thread(object, ...)
            } else {
                private$..s.thread(object, ...)
            }
        }
    ),


    public = list(
        initialize = function(object, ..., amount = 10, parallel = FALSE) {
            private$..amount = amount
            private$..parallel = parallel

            private$..run(object, ...)
        }
    ),


    active = list(
        warehouse = function() {
            return(private$..warehouse)
        }
    )
)

然后,它被称为:

library(foreach)

x = Factory$new(Task, time = 2, amount = 10, parallel = TRUE)

没有下面这行object$parent_env <- environment() ,它会引发错误(即,如其他两个链接中所述):Error in { : task 1 failed - "object 'Work' not found" .

我想知道,(1) 在分配 parent_env 时有哪些潜在的陷阱?里面foreach (2) 为什么它首先起作用?


更新 1:

  • 我返回了 environment()来自内部 foreach() , 这样 private$..warehouse捕捉那些环境
  • 使用rlang::env_print()在调试 session 中(即 browser() 语句被放置在 foreach 结束执行之后)它们的组成如下:
Browse[1]> env_print(private$..warehouse[[1]])

# <environment: 000000001A8332F0>
# parent: <environment: global>
# bindings:
#  * Work: <S3: R6ClassGenerator>
#  * ...: <...>

Browse[1]> env_print(environment())

# <environment: 000000001AC0F890>
# parent: <environment: 000000001AC20AF0>
# bindings:
#  * private: <env>
#  * cluster: <S3: SOCKcluster>
#  * ...: <...>

Browse[1]> env_print(parent.env(environment()))

# <environment: 000000001AC20AF0>
# parent: <environment: global>
# bindings:
#  * private: <env>
#  * self: <S3: Factory>

Browse[1]> env_print(parent.env(parent.env(environment())))

# <environment: global>
# parent: <environment: package:rlang>
# bindings:
#  * Work: <S3: R6ClassGenerator>
#  * .Random.seed: <int>
#  * Factory: <S3: R6ClassGenerator>
#  * Task: <S3: R6ClassGenerator>

最佳答案

免责声明:我在这里所说的很多内容都是基于我所知道的有根据的猜测和推论, 我不能保证一切都是 100% 正确的。

我认为可能会有很多陷阱, 哪一个适用真的取决于你做什么。 我认为你的第二个问题更重要, 因为如果你明白这一点, 您将能够自己评估一些陷阱。

题目比较复杂, 但您可能可以先阅读有关 R's lexical scoping 的内容. 本质上,R 有一种环境层次结构, 当执行 R 代码时, 在当前环境中找不到其值的变量 (这是 environment() 返回的内容) 在父级环境中寻找 (不要与调用者环境混淆)。

根据您链接的 GitHub 问题, R6 生成器将“引用”保存到它们的父环境, 他们希望他们的类可能需要的一切都可以在所述父级或环境层次结构的某个地方找到, 从那个 parent 开始并“向上”。

您使用的变通方法之所以有效,是因为您将生成器的父环境替换为并行工作程序中当前 foreach 调用中的环境 (可能是不同的 R 进程,不一定是不同的线程), 并且,鉴于您的 .export 规范可能会导出必要的值, 然后,R 的词法范围界定可以从单独线程/进程中的 foreach 调用开始搜索缺失值。

对于您链接的具体示例, 我发现一个更简单的方法让它工作 (至少在我的 Linux 机器上) 就是做以下事情:

library(doParallel)

cluster <- parallel::makeCluster(parallel::detectCores() -  1)
doParallel::registerDoParallel(cluster)
parallel::clusterExport(cluster, setdiff(ls(), "cluster"))

x = Factory$new(Task, time = 1, amount = 3)

但是..m.thread 函数保留为:

..m.thread = function(object, amount, ...) {
    private$..warehouse <- foreach::foreach(1:amount) %dopar% {
        object$new(...) 
    }
}

(完成后手动调用 stopCluster)。

clusterExport 调用的语义应该类似于*: 从主 R 进程的全局环境中获取除 cluster 之外的所有内容, 并使其在每个并行 worker 的全局环境中可用。 这样,当词法作用域到达它们各自的全局环境时,foreach 调用中的任何代码都可以使用生成器。 foreach 可以巧妙地自动导出一些变量 (如 GitHub 问题所示), 但它有局限性, 并且在词法作用域期间使用的层次结构会变得非常困惑。

*我说“类似于”是因为我不知道如果使用 fork ,R 到底做了什么来区分(全局)环境, 但由于需要导出, 我假设它们确实彼此独立。

PS:如果您在函数调用中创建 worker,我会调用 on.exit(parallel::stopCluster(cluster)), 这样一来,您就可以避免在出现错误时以某种方式停止进程,直到它们停止运行。

关于r - 如何在 R 中将 `foreach` 和 `%dopar%` 与 `R6` 类一起使用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57349958/

相关文章:

r - 关于R水管工的基本问题

c++ - 通过乱序执行并行化我的自定义编译器

c# - 即使MSDN另有建议,为什么Parallel.ForEach比AsParallel()。ForAll()快得多?

bash - Matlab parfor 使用的内核数少于分配的内核数

r - 并行处理的最佳内核数是多少?

r - 在 R 中执行详尽搜索的最快方法是什么

r - R中函数的语法?

r - 保持 geom_rect 半透明区域,但有彩色轮廓

r - 如何使用 `plot_grid` 包中的 `cowplot` 对齐图排列的最后一列图中包含的图例?

r - 如何在 R 中嵌套 foreach 循环的内循环和外循环之间添加代码