python - 我可以在设置过程中将外部 (R) 进程连接到每个 pyspark 工作线程吗

标签 python r apache-spark pyspark rpy2

我想让每个 python 工作线程使用 rpy2 启动 R shell。我可以在某种设置阶段执行此操作,就像我假设导入用于后续执行程序任务的 Python 模块时会发生的情况一样吗?例如:

import numpy as np

df.mapPartitions(lambda x: np.zeros(x))

就我而言,我想在每个执行器上启动 R shell 并导入 R 库,如下所示:

import rpy2.robjects as robjects
from  rpy2.robjects.packages import importr
rlibrary = importr('testrlibrary')

df.mapPartitions(lambda x: rlibrary.rfunc(x))

但我不希望这种情况发生在对 mapPartitions 的调用中,因为这样它就会在任务级别发生,而不是每个执行程序核心发生一次。这种方法有效并且看起来更像下面的示例,但对我来说没有用。

def model(partition):
    import rpy2.robjects as robjects
    from  rpy2.robjects.packages import importr
    rlibrary = importr('testrlibrary')
    rlibrary.rfunc(partition)

df.mapPartitions(model)

最佳答案

像这样的东西应该可以正常工作:

import rpy2.robjects as robjects
from  rpy2.robjects.packages import importr

def length_(s):
    stringi = importr("stringi")  
    return stringi.stri_length(s)[0]

sc.parallelize(["foo", "bar", "foobar"]).map(length_)

R object ,代表 R 解释器,is a singleton所以它只会被初始化一次,并且 R 不会重新导入已经附加的库。多次调用 require 会产生一些开销,但与将数据传入和传出 R 的成本相比,它应该可以忽略不计。

如果您想要更复杂的东西,您可以创建自己的 singleton module或使用Borg pattern处理进口,但这可能有点矫枉过正。

I assume this would happen when you import a python module to be used for later executor tasks

这实际上取决于配置。默认情况下,Spark 在任务之间重用解释器,但可以修改此行为。

我提供了一些示例作为 In Apache spark, what is the difference between using mapPartitions and combine use of broadcast variable and map 的答案。也许您会发现这些很有用。

关于python - 我可以在设置过程中将外部 (R) 进程连接到每个 pyspark 工作线程吗,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34645130/

相关文章:

r - 在 facet_wrap 标签上显示注释

apache-spark - Spark:整个数据集中在一个执行器中

hadoop - 使用 spark 跨 hadoop 集群复制数据

python - 我可以使用哪个 IronPython 编辑器为 Tibco Spotfire 控件开发脚本

python - 使用 Django ORM 查询基于日期的值

进程之间的python SharedMemory持久性

python - 无法激活 virtualenv 环境 -- tensorflow

r - 给定一个带有 A 列的 R 数据框,如何创建两个包含 A 的所有有序组合的新列

r - 如何将值列表分配给 R 中的 HashMap ?

c++ - 如果Spark的数据会在堆外缓存,它会有字节级规范吗?