python - PySpark 代码是在 JVM 还是 Python 子进程中运行?

标签 python apache-spark pyspark

当我使用 python3 t1.py 运行以下名为 t1.py 的脚本时,我想了解幕后发生了什么。具体来说,我有以下问题:

  • 向spark worker节点提交什么样的代码?它是提交给 spark worker 节点的 python 代码还是翻译后的等效 Java 代码?
  • reduce 中的添加操作是否被视为 UDF,从而在工作节点上的 python 子进程中运行?
  • 如果添加操作在工作节点上的 python 子进程中运行,工作 JVM 是否会为要添加的分区中的每个数字与 python 子进程通信?如果是这种情况,则意味着大量开销。
  •     #!/home/python3/venv/bin/python3
        #this file is named t1.py   
        import pyspark
        from pyspark.sql import SparkSession
        from pyspark.sql.types import DecimalType, IntegerType
        import pyspark.sql.functions as F
        from operator import add
        import pandas as pd
        from datetime import datetime
    
        len = int(100000000/1)
        print("len=", len)
        spark = SparkSession.builder.appName('ai_project').getOrCreate()
    
        start = datetime.now()
        t=spark.sparkContext.parallelize(range(len))
        a = t.reduce(add)
        print(a)
        end= datetime.now()
        print("end for spark rdd sum:", end, end-start)
    

    最佳答案

    在 PySpark 中,Python 和 JVM 代码位于不同的操作系统进程中。 PySpark 使用 Py4J,这是一个促进两种语言之间互操作的框架,在 Python 和 JVM 进程之间交换数据。

    当您启动 PySpark 作业时,它作为一个 Python 进程启动,然后生成一个 JVM 实例并在其中运行一些 PySpark 特定代码。然后它在那个 JVM 中实例化一个 Spark session ,它成为 Spark 看到的驱动程序。该驱动程序连接到 Spark master 或产生一个 in-proc ,具体取决于 session 的配置方式。

    当您创建 RDD 或数据帧时,它们就像由 Scala 或 Java 应用程序创建的 RDD 和数据帧一样存储在 Spark 集群的内存中。对它们的转换和操作就像它们在 JVM 中一样工作,但有一个显着区别:任何涉及通过 Python 代码传递数据的东西,都在 JVM 之外运行。因此,如果您创建一个 Dataframe,并执行以下操作:

    df.select("foo", "bar").where(df["foo"] > 100).count()
    

    这完全在 JVM 中运行,因为没有数据必须通过的 Python 代码。另一方面,如果你这样做:
    a = t.reduce(add)
    

    add operator 是 Python 的操作符,RDD 被序列化,然后发送到一个或多个 Python 进程执行归约,然后结果再次序列化并返回到 JVM,最后转移到 Python 驱动进程进行最终归约.

    它的工作方式(涵盖您的 Q1)是这样的:
  • 每个 Spark JVM 执行器都会生成一个运行特殊 PySpark 脚本的新 Python 子进程
  • Python 驱动程序序列化每个 Spark 任务必须执行的字节码(例如,add 运算符)并将其与一些附加数据一起腌制
  • JVM 执行器序列化其 RDD 分区,并将它们与从驱动程序接收的序列化 Python 字节码一起发送到其 Python 子进程
  • Python 代码在 RDD 数据上运行
  • 结果被序列化回并发送到JVM执行器

  • JVM 执行器使用网络套接字与它们产生的 Python 子进程对话,它们启动的特殊 PySpark 脚本运行一个循环,其任务是坐在那里并期望运行序列化数据和字节码。

    关于 Q3,JVM 执行程序将整个 RDD 分区传输到 Python 子进程,而不是单个项目。您应该努力使用 Pandas UDF,因为它们可以被矢量化。

    如果你对细节感兴趣,从 python/pyspark/rdd.py 的源代码开始并查看 RDD类(class)。

    关于python - PySpark 代码是在 JVM 还是 Python 子进程中运行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61816236/

    相关文章:

    apache-spark - PySpark,决策树(Spark 2.0.0)

    apache-spark - 两个非常相似的 Spark Dataframe 之间性能差异的可能原因

    python - 我在函数上苦苦挣扎,例如 def a (...)。括号里写什么?

    python - 如何使用其他方法从现有字典创建 Python Enum 类?

    python - 向具有三个独立输入的函数添加输入验证

    apache-spark - Spark : create a nested schema

    apache-spark - worker如何使用standalone Master的资源?

    PySpark 通过 Hive Metastore 读取 Iceberg 表到 S3

    python - matplotlib 强制平移/缩放限制到 x 轴

    Azure Blob 存储 Spark