python - 如何在 Debug模式下调用PySpark?

标签 python python-2.7 hadoop intellij-idea apache-spark

我已使用Apache Spark 1.4设置了IntelliJ IDEA。

我希望能够将调试点添加到我的Spark Python脚本中,以便可以轻松调试它们。

我目前正在运行这段Python来初始化Spark过程

proc = subprocess.Popen([SPARK_SUBMIT_PATH, scriptFile, inputFile], shell=SHELL_OUTPUT, stdout=subprocess.PIPE)

if VERBOSE:
    print proc.stdout.read()
    print proc.stderr.read()

spark-submit最终调用myFirstSparkScript.py时, Debug模式未启用,它会正常执行。不幸的是,编辑Apache Spark源代码并运行自定义副本不是可接受的解决方案。

有谁知道是否有可能在 Debug模式下通过spark-submit调用Apache Spark脚本?如果是这样,怎么办?

最佳答案

据我了解,您的意图在Spark架构下无法直接实现。即使没有subprocess调用,程序中唯一可以直接在驱动程序上访问的部分就是SparkContext。与其他组件相比,您可以有效地隔离在不同的通信层中,包括至少一个(以本地模式)JVM实例。为了说明这一点,让我们使用PySpark Internals documentation中的图表。

enter image description here

左侧框中的内容是本地可访问的部分,可用于附加调试器。由于它仅限于JVM调用,因此实际上没有任何您感兴趣的东西,除非您实际上在修改PySpark本身。

正确的事情是远程发生的,并且从您的 Angular 来看,取决于您使用的群集管理器,这几乎是一个黑匣子。此外,在许多情况下,右侧的Python代码只需要调用JVM API就可以。

这是最糟糕的部分。优点是,大多数时候不需要远程调试。除了访问TaskContext之类的对象(可以轻松模拟)之外,代码的每个部分都应该可以在本地轻松地运行/测试,而无需使用任何Spark实例。

传递给操作/转换的函数采用标准且可预测的Python对象,并且期望它们也返回标准Python对象。重要的是这些应该没有副作用

因此,总而言之,您必须完成部分程序-可以交互访问并仅基于输入/输出和“计算核心”进行测试的薄层,而无需使用Spark进行测试/调试。

其他选择

话虽如此,您在这里并没有完全超出选择范围。

本地模式

(将调试器被动地附加到正在运行的解释器上)

普通的GDB和PySpark调试器都可以连接到正在运行的进程。仅在启动PySpark守护进程和/或工作进程后才能执行此操作。在本地模式下,可以通过执行虚拟操作来强制执行此操作,例如:

sc.parallelize([], n).count()

其中n是在本地模式下可用的许多“核心”(local[n])。在类Unix的系统上分步进行的示例过程:
  • 启动PySpark shell:
    $SPARK_HOME/bin/pyspark 
    
  • 使用pgrep来检查没有守护进程在运行:
    ➜  spark-2.1.0-bin-hadoop2.7$ pgrep -f pyspark.daemon
    ➜  spark-2.1.0-bin-hadoop2.7$
    
  • 在PyCharm中可以通过以下方法确定同一件事:

    alt + shift + a并选择附加到本地进程:

    enter image description here

    或运行->附加到本地进程。

    在这一点上,您应该只看到PySpark shell(可能还有一些不相关的进程)。

    enter image description here
  • 执行虚拟操作:

    sc.parallelize([],1).count()
  • 现在您应该同时看到daemonworker(这里只有一个):
    ➜  spark-2.1.0-bin-hadoop2.7$ pgrep -f pyspark.daemon
    13990
    14046
    ➜  spark-2.1.0-bin-hadoop2.7$
    



    enter image description here
    pid较低的进程是一个守护进程,pid较高的进程是(临时)临时进程。
  • 此时,您可以将调试器附加到感兴趣的进程:
  • 在PyCharm中,选择要连接的过程。
  • 通过调用以下普通GDB:
    gdb python <pid of running process>
    

  • 这种方法的最大缺点是您在正确的时间找到了正确的解释器。

    分布式模式

    (使用连接到调试器服务器的 Activity 组件)

    与PyCharm

    PyCharm提供了Python Debug Server,可以与PySpark作业一起使用。

    首先,您应该为远程调试器添加一个配置:
  • alt + shift + a并选择“编辑配置”或“运行”->“编辑配置”。
  • 单击添加新配置(绿色加号),然后选择Python Remote Debug。
  • 根据您自己的配置配置主机和端口(确保从远程计算机访问该端口)

    enter image description here
  • 启动调试服务器:

    Shift + F9

    您应该看到调试器控制台:

    enter image description here
  • 通过安装或分发pyddev文件,确保在工作程序节点上可访问egg
  • pydevd 使用一个 Activity 组件,该组件必须包含在您的代码中:
    import pydevd
    pydevd.settrace(<host name>, port=<port number>)
    

    棘手的部分是找到合适的位置,除非您调试批处理操作(如传递给mapPartitions的函数),否则可能需要修补PySpark源代码本身,例如pyspark.daemon.workerRDD方法(例如RDD.mapPartitions)。假设我们对调试工作人员行为感兴趣。可能的补丁可能如下所示:
    diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
    index 7f06d4288c..6cff353795 100644
    --- a/python/pyspark/daemon.py
    +++ b/python/pyspark/daemon.py
    @@ -44,6 +44,9 @@ def worker(sock):
         """
         Called by a worker process after the fork().
         """
    +    import pydevd
    +    pydevd.settrace('foobar', port=9999, stdoutToServer=True, stderrToServer=True)
    +
         signal.signal(SIGHUP, SIG_DFL)
         signal.signal(SIGCHLD, SIG_DFL)
         signal.signal(SIGTERM, SIG_DFL)
    

    如果决定修补Spark源,请确保使用$SPARK_HOME/python/lib中的修补源而非打包版本。
  • 执行PySpark代码。返回调试器控制台,并玩得开心:

    enter image description here

  • 其他工具

    有很多工具,包括python-manhole pyrasite ,可以与PySpark一起使用,但需要付出一些努力。

    注意:

    当然,您可以在本地模式下使用“远程”( Activity )方法,在某种程度上可以在分布式模式下使用“本地”方法(您可以连接到工作程序节点,并按照与本地模式相同的步骤进行操作)。

    关于python - 如何在 Debug模式下调用PySpark?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31245083/

    相关文章:

    c# - 理解并创建 Python 协议(protocol)

    python - 断言整数在范围内

    python - 使用用户定义的键返回无的列表排序

    hadoop - NodeManager和ResourceManager没有运行

    hadoop - 如何从多个现有表填充新配置单元表的列?

    python - 解析问题

    Python3关于循环迭代多变量简单算法

    java - 在 Java 上使用 DFSClient 将文件上传到 HDFS

    python - 我在 "TypeError: str() takes at most 1 argument (2 given)"变量处收到此错误 "client_response"

    python - 在opencv python中每秒处理一帧