apache-spark - Spark - 如何在 N 个分区上进行计算然后写入 1 个文件

标签 apache-spark pyspark

我想在许多分区上进行计算,以从并行性中受益,然后将结果写入单个文件,可能是 Parquet 文件。我在 PySpark 1.6.0 中尝试的工作流程类似于:

data_df = sqlContext.read.load('my_parquet_file')
mapped_df = sqlContext.createDataFrame(data_df.map(lambda row: changeRow(row)), ['c1', 'c2'])
coalesced_df = mapped_df.coalesce(1)
coalesced_df.write.parquet('new_parquet_file')

但从 Spark 的 Web UI 来看,所有工作(包括 map 部分)都发生在单个线程上。

有没有办法对此进行调整,以便 map 发生在许多分区上,而 write 只发生在 1 个分区上?我尝试过的唯一我认为有效的方法是在 mapcoalesce 之间放置一个 mapped_df.count() ,但是那感觉不是一种令人满意的做法。

最佳答案

Spark 执行惰性求值,这意味着在调用某个操作之前它不会执行任何操作。 writecount 都是会欺骗执行的操作。像 mapfilter 这样的函数很容易在执行某些操作时执行,而不是在执行某些操作之前执行。 p>

现在,您的管道非常简单,您只有一个操作(写入),因此在写入文件时会执行map>。然而,通过调用 coalesce(1),您还告诉 Spark 在执行 write 操作之前将所有数据收集到一个分区中,并且自 mapwrite 操作中执行的操作的一部分,map 也将在一个分区中运行。

我希望这是有道理的。我建议您还阅读一些有关 Spark 如何工作的博客文章。 This one来自 Cloudera,应该会给你一些见解:)

关于apache-spark - Spark - 如何在 N 个分区上进行计算然后写入 1 个文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37215810/

相关文章:

scala - Spark通用案例类到结构类型

python - PySpark 无法访问使用 StringIndexer 添加的列

python - 属性错误 : 'DataFrame' object has no attribute '_data'

apache-spark - Spark : Merge 2 dataframes by adding row index/number on both dataframes

python - 通过 Okta 身份验证使用 Databricks Snowflake 连接器连接到 Snowflake

apache-spark - Pyspark 读取包含 csv 的 7z 压缩文件

apache-spark - 如何将字符串中的时间值从 PT 格式转换为秒?

scala - 将行或列转换为数据框

java - Spark 1.3.1 ClassNotFoundException 上的 Apache Phoenix(4.3.1 和 4.4.0-HBase-0.98)

python - 如何在 PySpark 中创建 merge_asof 功能?