python - PySpark 分发模块导入

标签 python apache-spark pyspark

在过去的几天里,我一直在努力了解 Spark 执行程序如何知道如何在导入时通过给定名称使用模块。我正在研究 AWS EMR。情况: 我通过键入在 EMR 上初始化 pyspark

pyspark --master yarn

然后,在 pyspark 中,

import numpy as np ## notice the naming

def myfun(x):
    n = np.random.rand(1)
    return x*n

rdd = sc.parallelize([1,2,3,4], 2)
rdd.map(lambda x: myfun(x)).collect() ## works!

我的理解是,当我导入numpy as np时,主节点是唯一通过np导入和识别numpy的节点。但是,对于 EMR 集群(2 个工作节点),如果我在 rdd 上运行 map 函数,驱动程序会将函数发送到工作节点以对列表中的每个项目(每个分区)执行该函数,并且返回成功结果。

我的问题是: worker 们怎么知道应该将 numpy 导入为 np?每个工作人员都已经安装了 numpy,但我没有明确定义每个节点导入模块 as np 的方法。

有关依赖项的更多详细信息,请参阅 Cloudera 的以下帖子: http://blog.cloudera.com/blog/2015/09/how-to-prepare-your-apache-hadoop-cluster-for-pyspark-jobs/

Complex Dependency 下,他们有一个示例(代码),其中在每个节点上显式导入了 pandas 模块。

我听说的一个理论是驱动程序分发在 pyspark 交互式 shell 中传递的所有代码。我对此持怀疑态度。我提出来反驳这个想法的例子是,如果我在主节点上键入:

print "hello"

是否每个工作节点都在打印“hello”?我不这么认为。但也许我错了。

最佳答案

当函数被序列化时,有一个 number of objects is being saved :

  • 代码
  • 全局变量
  • 默认值
  • closure
  • 字典

稍后可用于恢复给定功能所需的完整环境。

因为 np 被函数引用,所以可以从它的代码中提取:

from pyspark.cloudpickle import CloudPickler

CloudPickler.extract_code_globals(myfun.__code__)
## {'np'}

和绑定(bind)可以从它的globals中提取:

myfun.__globals__['np']
## <module 'numpy' from ...

因此序列化闭包(广义上)捕获了恢复环境所需的所有信息。当然,在闭包中访问的所有模块都必须在每台工作机器上都是可导入的。

其他一切都只是读写机器。

在旁注中,主节点不应执行任何 Python 代码。它负责资源分配而不是运行应用程序代码。

关于python - PySpark 分发模块导入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38838465/

相关文章:

python - Pyspark Invalid Input Exception try except 错误

apache-spark - 优化从 s3 存储桶中的分区 Parquet 文件读取

apache-spark - 无法使用 Spark 连续流处理数据

python - 当值与pyspark中字符串的一部分匹配时过滤df

python - AppEngine : No module named pyasn1. 兼容.binary

apache-spark - java.net.BindException : Address already in use while using Google DataProc

python - DRF - 编写双嵌套序列化程序的更好方法

Elasticsearch-hadoop & Elasticsearch-spark sql - 语句扫描和滚动的跟踪

python - PyCharm的docstring模板有什么用?我如何有效地使用它?

java - 如何在Python中检查Windows中 "start"命令启动的程序的输出?