apache-spark - 如何将 Prefect 的资源管理器与 Spark 集群结合使用

标签 apache-spark pyspark prefect

我一直在用 Prefect 进行工作流程管理,但被困住了 使用 Prefect 的资源管理器建立和停止 Spark session 。

我浏览了 Prefects 文档,并提供了 Dusk 的示例:

from prefect import resource_manager
from dask.distributed import Client

@resource_manager
class DaskCluster:
    def init(self, n_workers):
        self.n_workers = n_workers

    def setup(self):
        "Create a local dask cluster"
        return Client(n_workers=self.n_workers)

    def cleanup(self, client):
        "Cleanup the local dask cluster"
        client.close()
        
        
with Flow("example") as flow:
    n_workers = Parameter("n_workers")

    with DaskCluster(n_workers=n_workers) as client:
        some_task(client)
        some_other_task(client)        

但是我无法弄清楚如何通过 Spark session 执行相同的操作。

最佳答案

最简单的方法是在本地模式下使用 Spark:

from prefect import task, Flow, resource_manager

from pyspark import SparkConf
from pyspark.sql import SparkSession

@resource_manager
class SparkCluster:
    def __init__(self, conf: SparkConf = SparkConf()):
        self.conf = conf

    def setup(self) -> SparkSession:
        return SparkSession.builder.config(conf=self.conf).getOrCreate()

    def cleanup(self, spark: SparkSession):
        spark.stop()

@task
def get_data(spark: SparkSession):
    return spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])

@task(log_stdout=True)
def analyze(df):
    word_count = df.groupBy('word').count()
    word_count.show()


with Flow("spark_flow") as flow:
    conf = SparkConf().setMaster('local[*]')
    with SparkCluster(conf) as spark:
        df = get_data(spark)
        analyze(df)

if __name__ == '__main__':
    flow.run()

您的 setup() 方法返回正在管理的资源,而 cleanup() 方法接受 setup() 返回的相同资源。在本例中,我们创建并返回一个 Spark session ,然后停止它。您不需要 spark-submit 或任何东西(尽管我发现以这种方式管理依赖项有点困难)。

扩大规模变得更加困难,这是我仍在努力解决的问题。例如,Prefect 不知道如何序列化 Spark DataFrame 以进行输出缓存或持久化结果。另外,在将 Dask 执行器与 Spark session 一起使用时必须小心,因为它们无法进行 pickle,因此您必须将执行器设置为使用 scheduler='threads' (请参阅 here ) .

关于apache-spark - 如何将 Prefect 的资源管理器与 Spark 集群结合使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68857241/

相关文章:

apache-spark - 从简单的json文件创建和显示spark数据框

apache-spark - 当系列到系列(PandasUDFType.SCALAR)可用时,为什么系列迭代器到系列 pandasUDF(PandasUDFType.SCALAR_ITER)的迭代器?

apache-spark - 使用 pyspark 将嵌套的 json 对象插入到 PostgreSQL

python - 你如何安排一些 python 脚本在 Windows PC 上定期运行?

apache-spark - Scala 的 Apache Toree 语法高亮

apache-spark - 如何从迭代器创建Spark RDD?

Python/Pyspark - 计算 NULL、空和 NaN

prefect - 如何使用 prefect "mapped"并行化嵌套循环

docker - 如何在 Prefect 流程中使用自定义 Docker 存储?

json - 我如何在 spark(scala 或 java)中将平面数据帧转换为嵌套的 json