我已使用Apache Spark 1.4设置了IntelliJ IDEA。
我希望能够将调试点添加到我的Spark Python脚本中,以便可以轻松调试它们。
我目前正在运行这段Python来初始化Spark过程
proc = subprocess.Popen([SPARK_SUBMIT_PATH, scriptFile, inputFile], shell=SHELL_OUTPUT, stdout=subprocess.PIPE)
if VERBOSE:
print proc.stdout.read()
print proc.stderr.read()
当
spark-submit
最终调用myFirstSparkScript.py
时, Debug模式未启用,它会正常执行。不幸的是,编辑Apache Spark源代码并运行自定义副本不是可接受的解决方案。有谁知道是否有可能在 Debug模式下通过spark-submit调用Apache Spark脚本?如果是这样,怎么办?
最佳答案
据我了解,您的意图在Spark架构下无法直接实现。即使没有subprocess
调用,程序中唯一可以直接在驱动程序上访问的部分就是SparkContext
。与其他组件相比,您可以有效地隔离在不同的通信层中,包括至少一个(以本地模式)JVM实例。为了说明这一点,让我们使用PySpark Internals documentation中的图表。
左侧框中的内容是本地可访问的部分,可用于附加调试器。由于它仅限于JVM调用,因此实际上没有任何您感兴趣的东西,除非您实际上在修改PySpark本身。
正确的事情是远程发生的,并且从您的 Angular 来看,取决于您使用的群集管理器,这几乎是一个黑匣子。此外,在许多情况下,右侧的Python代码只需要调用JVM API就可以。
这是最糟糕的部分。优点是,大多数时候不需要远程调试。除了访问TaskContext
之类的对象(可以轻松模拟)之外,代码的每个部分都应该可以在本地轻松地运行/测试,而无需使用任何Spark实例。
传递给操作/转换的函数采用标准且可预测的Python对象,并且期望它们也返回标准Python对象。重要的是这些应该没有副作用
因此,总而言之,您必须完成部分程序-可以交互访问并仅基于输入/输出和“计算核心”进行测试的薄层,而无需使用Spark进行测试/调试。
其他选择
话虽如此,您在这里并没有完全超出选择范围。
本地模式
(将调试器被动地附加到正在运行的解释器上)
普通的GDB和PySpark调试器都可以连接到正在运行的进程。仅在启动PySpark守护进程和/或工作进程后才能执行此操作。在本地模式下,可以通过执行虚拟操作来强制执行此操作,例如:
sc.parallelize([], n).count()
其中
n
是在本地模式下可用的许多“核心”(local[n]
)。在类Unix的系统上分步进行的示例过程:$SPARK_HOME/bin/pyspark
pgrep
来检查没有守护进程在运行:➜ spark-2.1.0-bin-hadoop2.7$ pgrep -f pyspark.daemon
➜ spark-2.1.0-bin-hadoop2.7$
alt + shift + a并选择附加到本地进程:
或运行->附加到本地进程。
在这一点上,您应该只看到PySpark shell(可能还有一些不相关的进程)。
sc.parallelize([],1).count()
daemon
和worker
(这里只有一个):➜ spark-2.1.0-bin-hadoop2.7$ pgrep -f pyspark.daemon
13990
14046
➜ spark-2.1.0-bin-hadoop2.7$
和
pid
较低的进程是一个守护进程,pid
较高的进程是(临时)临时进程。 gdb python <pid of running process>
这种方法的最大缺点是您在正确的时间找到了正确的解释器。
分布式模式
(使用连接到调试器服务器的 Activity 组件)
与PyCharm
PyCharm提供了Python Debug Server,可以与PySpark作业一起使用。
首先,您应该为远程调试器添加一个配置:
Shift + F9
您应该看到调试器控制台:
pyddev
文件,确保在工作程序节点上可访问egg
。 pydevd
使用一个 Activity 组件,该组件必须包含在您的代码中:import pydevd
pydevd.settrace(<host name>, port=<port number>)
棘手的部分是找到合适的位置,除非您调试批处理操作(如传递给
mapPartitions
的函数),否则可能需要修补PySpark源代码本身,例如pyspark.daemon.worker
或RDD
方法(例如RDD.mapPartitions
)。假设我们对调试工作人员行为感兴趣。可能的补丁可能如下所示:diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index 7f06d4288c..6cff353795 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -44,6 +44,9 @@ def worker(sock):
"""
Called by a worker process after the fork().
"""
+ import pydevd
+ pydevd.settrace('foobar', port=9999, stdoutToServer=True, stderrToServer=True)
+
signal.signal(SIGHUP, SIG_DFL)
signal.signal(SIGCHLD, SIG_DFL)
signal.signal(SIGTERM, SIG_DFL)
如果决定修补Spark源,请确保使用
$SPARK_HOME/python/lib
中的修补源而非打包版本。 其他工具
有很多工具,包括python-manhole或
pyrasite
,可以与PySpark一起使用,但需要付出一些努力。注意:
当然,您可以在本地模式下使用“远程”( Activity )方法,在某种程度上可以在分布式模式下使用“本地”方法(您可以连接到工作程序节点,并按照与本地模式相同的步骤进行操作)。
关于python - 如何在 Debug模式下调用PySpark?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31245083/