python Spark 替代非常大的数据爆炸

标签 python arrays apache-spark count

我有一个像这样的数据框:

df = spark.createDataFrame([(0, ["B","C","D","E"]),(1,["E","A","C"]),(2, ["F","A","E","B"]),(3,["E","G","A"]),(4,["A","C","E","B","D"])], ["id","items"])

创建一个数据框df像这样:

+---+-----------------+
|  0|     [B, C, D, E]|
|  1|        [E, A, C]|
|  2|     [F, A, E, B]|
|  3|        [E, G, A]|
|  4|  [A, C, E, B, D]|
+---+-----------------+ 

我想得到这样的结果:

+---+-----+
|all|count|
+---+-----+
|  F|    1|
|  E|    5|
|  B|    3|
|  D|    2|
|  C|    3|
|  A|    4|
|  G|    1|
+---+-----+

本质上只是查找 df["items"] 中的所有不同元素并计算它们的频率。如果我的数据大小更易于管理,我会这样做:

all_items = df.select(explode("items").alias("all")) 
result = all_items.groupby(all_items.all).count().distinct() 
result.show()

但是因为我的数据有数百万行,每个列表中有数千个元素,所以这不是一个选项。我正在考虑逐行执行此操作,这样我一次只处理 2 个列表。因为大多数元素经常在多行中重复(但每行中的列表是一个集合),所以这种方法应该可以解决我的问题。但问题是,我真的不知道如何在 Spark 中做到这一点,因为我才刚刚开始学习它。请问有人可以帮忙吗?

最佳答案

您需要做的是减少爆炸分区的大小。有 2 个选项可以执行此操作。首先,如果您的输入数据是可拆分的,您可以减小 Spark.sql.files.maxPartitionBytes 的大小,以便 Spark 读取更小的拆分。另一种选择是在爆炸之前重新分区。

default value maxPartitionBytes 的大小为 128MB,因此 Spark 将尝试以 128MB block 的形式读取数据。如果数据不可分割,那么它会将整个文件读入单个分区,在这种情况下,您需要进行重新分区

在您的情况下,由于您正在进行爆炸,假设每个分区增加 128MB,增加了 100 倍,那么最终每个分区将增加 12GB+!

您可能需要考虑的另一件事是您的随机分区,因为您正在进行聚合。同样,您可能需要通过将 Spark.sql.shuffle.partitions 设置为高于默认 200 的值来增加爆炸后聚合的分区。您可以使用 Spark UI 查看您的随机播放阶段,查看每个任务读取了多少数据并进行相应调整。

我在talk中讨论了这个和其他调整建议。我刚刚在欧洲 Spark 峰会上发表了讲话。

关于python Spark 替代非常大的数据爆炸,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52777421/

相关文章:

arrays - 如何在VBA的“即时”窗口中打印二维数组?

PHP在多维数组中找到第二大元素值

scala - Spark : Caching an RDD/DF for use across multiple programs

scala - 如何将两列合并到一个新的 DataFrame 中?

python - python中的Celery构建微服务

python - 使用 dill 库保存和加载 neupy 算法可以在同一时间段返回不同的预测吗?

Python:用特定词提取句子

python - 使用更新的请求参数进行重定向

c - 在至少出现 3 次的数组中查找最频繁的元素

apache-spark - 在 Spark 中设置 "spark.memory.storageFraction"不起作用