我一直在用 Prefect 进行工作流程管理,但被困住了 使用 Prefect 的资源管理器建立和停止 Spark session 。
我浏览了 Prefects 文档,并提供了 Dusk 的示例:
from prefect import resource_manager
from dask.distributed import Client
@resource_manager
class DaskCluster:
def init(self, n_workers):
self.n_workers = n_workers
def setup(self):
"Create a local dask cluster"
return Client(n_workers=self.n_workers)
def cleanup(self, client):
"Cleanup the local dask cluster"
client.close()
with Flow("example") as flow:
n_workers = Parameter("n_workers")
with DaskCluster(n_workers=n_workers) as client:
some_task(client)
some_other_task(client)
但是我无法弄清楚如何通过 Spark session 执行相同的操作。
最佳答案
最简单的方法是在本地模式下使用 Spark:
from prefect import task, Flow, resource_manager
from pyspark import SparkConf
from pyspark.sql import SparkSession
@resource_manager
class SparkCluster:
def __init__(self, conf: SparkConf = SparkConf()):
self.conf = conf
def setup(self) -> SparkSession:
return SparkSession.builder.config(conf=self.conf).getOrCreate()
def cleanup(self, spark: SparkSession):
spark.stop()
@task
def get_data(spark: SparkSession):
return spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
@task(log_stdout=True)
def analyze(df):
word_count = df.groupBy('word').count()
word_count.show()
with Flow("spark_flow") as flow:
conf = SparkConf().setMaster('local[*]')
with SparkCluster(conf) as spark:
df = get_data(spark)
analyze(df)
if __name__ == '__main__':
flow.run()
您的 setup()
方法返回正在管理的资源,而 cleanup()
方法接受 setup()
返回的相同资源。在本例中,我们创建并返回一个 Spark session ,然后停止它。您不需要 spark-submit
或任何东西(尽管我发现以这种方式管理依赖项有点困难)。
扩大规模变得更加困难,这是我仍在努力解决的问题。例如,Prefect 不知道如何序列化 Spark DataFrame 以进行输出缓存或持久化结果。另外,在将 Dask 执行器与 Spark session 一起使用时必须小心,因为它们无法进行 pickle,因此您必须将执行器设置为使用 scheduler='threads'
(请参阅 here ) .
关于apache-spark - 如何将 Prefect 的资源管理器与 Spark 集群结合使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68857241/