python - (pyspark数据帧跨多个ID重新分区

标签 python apache-spark pyspark partitioning

我有一张巨大的 table ,我的 Spark 工作一直崩溃。我想重新分区。我有两个变量(idtime),我需要确保具有给定 id 的所有行都将分配给同一个工作人员。但我有数亿个唯一ID。 我希望 pyspark 均匀地分布数据,但考虑到对于给定的 ID,所有行都应该位于一个工作人员上。我可以简单地这样做吗:

df.repartition("id")

documentation ,似乎是这样建议的。但我想知道 Spark 现在是否会将作业划分为数亿个子集,并且一次只向每个工作人员发送一个子集(即一个 id 的数据)。这当然是非常低效的。

我使用的是 Spark 2.4.0-cdh6.2.1

最佳答案

让我们使用explain看看当您调用repartition时spark会做什么:

>>> spark.range(20).repartition("id").explain()
== Physical Plan ==
Exchange hashpartitioning(id#0L, 200)
+- *(1) Range (0, 20, step=1, splits=8)

Exchange hashpartitioning(id#0L, 200)意味着将数据打乱到 200 个分区。行最终所在的分区是通过 id.hashCode() % 200 确定的。如果您的数据没有偏差,则分布应该相当均匀。 200 是 spark.sql.shuffle.partitions 的默认值决定shuffle后生成多少个分区。要将该值更改为 400,您可以更改正在执行的配置的值 spark.conf.set("spark.sql.shuffle.partitions", 400)或者repartition(400, "id") 。事实上,如果您有大量数据,200 个可能还不够。

关于python - (pyspark数据帧跨多个ID重新分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69584580/

相关文章:

python - 可以在 Python 中更快地进行可变长度迭代的简单计算吗?

python - 检查用户是否有discord.py的权限

java - 使用带有 ScalaObjectMapper 的 Jackson 模块在 Spark 1.4.0 上运行作业时出错

scala - 如何检测 Spark DataFrame 是否有列

python - 如何对 PySpark DataFrame 进行花式索引?

apache-spark - spark.sql 与 SqlContext

Python - 如何将计算函数与 pandas groupby 一起使用?

scala - Spark RDD : filling inregular time series

python - 如何自动从数据框列进行自然对数计算?

python - pygame:自定义类继承自 pygame.Surface