在过去的几天里,我一直在努力了解 Spark 执行程序如何知道如何在导入时通过给定名称使用模块。我正在研究 AWS EMR。情况: 我通过键入在 EMR 上初始化 pyspark
pyspark --master yarn
然后,在 pyspark 中,
import numpy as np ## notice the naming
def myfun(x):
n = np.random.rand(1)
return x*n
rdd = sc.parallelize([1,2,3,4], 2)
rdd.map(lambda x: myfun(x)).collect() ## works!
我的理解是,当我导入numpy as np
时,主节点是唯一通过np
导入和识别numpy
的节点。但是,对于 EMR 集群(2 个工作节点),如果我在 rdd 上运行 map 函数,驱动程序会将函数发送到工作节点以对列表中的每个项目(每个分区)执行该函数,并且返回成功结果。
我的问题是: worker 们怎么知道应该将 numpy 导入为 np?每个工作人员都已经安装了 numpy,但我没有明确定义每个节点导入模块 as np
的方法。
有关依赖项的更多详细信息,请参阅 Cloudera 的以下帖子: http://blog.cloudera.com/blog/2015/09/how-to-prepare-your-apache-hadoop-cluster-for-pyspark-jobs/
在 Complex Dependency 下,他们有一个示例(代码),其中在每个节点上显式导入了 pandas 模块。
我听说的一个理论是驱动程序分发在 pyspark 交互式 shell 中传递的所有代码。我对此持怀疑态度。我提出来反驳这个想法的例子是,如果我在主节点上键入:
print "hello"
是否每个工作节点都在打印“hello”?我不这么认为。但也许我错了。
最佳答案
当函数被序列化时,有一个 number of objects is being saved :
- 代码
- 全局变量
- 默认值
- closure
- 字典
稍后可用于恢复给定功能所需的完整环境。
因为 np
被函数引用,所以可以从它的代码中提取:
from pyspark.cloudpickle import CloudPickler
CloudPickler.extract_code_globals(myfun.__code__)
## {'np'}
和绑定(bind)可以从它的globals
中提取:
myfun.__globals__['np']
## <module 'numpy' from ...
因此序列化闭包(广义上)捕获了恢复环境所需的所有信息。当然,在闭包中访问的所有模块都必须在每台工作机器上都是可导入的。
其他一切都只是读写机器。
在旁注中,主节点不应执行任何 Python 代码。它负责资源分配而不是运行应用程序代码。
关于python - PySpark 分发模块导入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38838465/