我想让每个 python 工作线程使用 rpy2 启动 R shell。我可以在某种设置阶段执行此操作,就像我假设导入用于后续执行程序任务的 Python 模块时会发生的情况一样吗?例如:
import numpy as np
df.mapPartitions(lambda x: np.zeros(x))
就我而言,我想在每个执行器上启动 R shell 并导入 R 库,如下所示:
import rpy2.robjects as robjects
from rpy2.robjects.packages import importr
rlibrary = importr('testrlibrary')
df.mapPartitions(lambda x: rlibrary.rfunc(x))
但我不希望这种情况发生在对 mapPartitions
的调用中,因为这样它就会在任务级别发生,而不是每个执行程序核心发生一次。这种方法有效并且看起来更像下面的示例,但对我来说没有用。
def model(partition):
import rpy2.robjects as robjects
from rpy2.robjects.packages import importr
rlibrary = importr('testrlibrary')
rlibrary.rfunc(partition)
df.mapPartitions(model)
最佳答案
像这样的东西应该可以正常工作:
import rpy2.robjects as robjects
from rpy2.robjects.packages import importr
def length_(s):
stringi = importr("stringi")
return stringi.stri_length(s)[0]
sc.parallelize(["foo", "bar", "foobar"]).map(length_)
R
object ,代表 R 解释器,is a singleton所以它只会被初始化一次,并且 R 不会重新导入已经附加的库。多次调用 require
会产生一些开销,但与将数据传入和传出 R 的成本相比,它应该可以忽略不计。
如果您想要更复杂的东西,您可以创建自己的 singleton module或使用Borg pattern处理进口,但这可能有点矫枉过正。
I assume this would happen when you import a python module to be used for later executor tasks
这实际上取决于配置。默认情况下,Spark 在任务之间重用解释器,但可以修改此行为。
我提供了一些示例作为 In Apache spark, what is the difference between using mapPartitions and combine use of broadcast variable and map 的答案。也许您会发现这些很有用。
关于python - 我可以在设置过程中将外部 (R) 进程连接到每个 pyspark 工作线程吗,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34645130/