scala - 线程 # 将 _ GB 的排序数据溢出到磁盘

标签 scala apache-spark

我正在尝试编写一个 ETL 过程,在联合之前合并两个数据集,我向每个数据集添加一列,较新的数据集获取 2,较旧的数据集获取 1,然后如果行具有重复的主键,我会删除该行旧/新列中有 1。我尝试以多种方式编写此内容,最近通过执行以下操作:

orderBy(keys, desc(old/new)).dropDuplicates(keys)

但是在大​​型数据集上,我总是会出现大幅减速,并显示一条消息:

16/09/21 20:31:45 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (0  time so far)
16/09/21 20:32:00 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (1  time so far)
16/09/21 20:32:16 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (2  times so far)
16/09/21 20:32:31 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (3  times so far)
16/09/21 20:32:47 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (4  times so far)
16/09/21 20:33:02 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (5  times so far)
16/09/21 20:33:18 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (6  times so far)
16/09/21 20:33:33 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (7  times so far)
16/09/21 20:33:49 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (8  times so far)
16/09/21 20:34:04 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (9  times so far)
16/09/21 20:34:19 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (10  times so far)
16/09/21 20:34:35 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (11  times so far)
16/09/21 20:34:50 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (12  times so far)
16/09/21 20:35:06 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (13  times so far)
16/09/21 20:35:21 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (14  times so far)
16/09/21 20:35:37 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (15  times so far)
16/09/21 20:35:52 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (16  times so far)
16/09/21 20:36:07 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (17  times so far)
16/09/21 20:36:23 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (18  times so far)
16/09/21 20:36:38 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (19  times so far)
16/09/21 20:36:53 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (20  times so far)
16/09/21 20:37:09 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (21  times so far)
16/09/21 20:37:24 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (22  times so far)
16/09/21 20:37:40 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (23  times so far)
16/09/21 20:37:55 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (24  times so far)
16/09/21 20:38:10 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (25  times so far)
16/09/21 20:38:25 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (26  times so far)
16/09/21 20:38:41 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (27  times so far)
16/09/21 20:38:56 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (28  times so far)
16/09/21 20:39:25 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (0  time so far)
16/09/21 20:39:45 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (1  time so far)
16/09/21 20:40:05 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (2  times so far)
16/09/21 20:40:26 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (3  times so far)
16/09/21 20:40:46 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (4  times so far)
16/09/21 20:41:07 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (5  times so far)
16/09/21 20:41:27 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (6  times so far)
16/09/21 20:41:47 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (7  times so far)
16/09/21 20:42:07 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (8  times so far)
16/09/21 20:42:28 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (9  times so far)
16/09/21 20:42:49 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (10  times so far)
16/09/21 20:43:09 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (11  times so far)
16/09/21 20:43:30 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (12  times so far)
16/09/21 20:43:50 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (13  times so far)
16/09/21 20:44:11 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (14  times so far)
16/09/21 20:44:31 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (15  times so far)
16/09/21 20:44:52 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (16  times so far)
16/09/21 20:45:13 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (17  times so far)
16/09/21 20:45:33 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (18  times so far)
16/09/21 20:45:53 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (19  times so far)
16/09/21 20:46:14 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (20  times so far)
16/09/21 20:46:34 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (21  times so far)
16/09/21 20:46:54 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (22  times so far)
16/09/21 20:47:14 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (23  times so far)
16/09/21 20:47:34 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (24  times so far)
16/09/21 20:47:54 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (25  times so far)
16/09/21 20:48:14 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (26  times so far)
16/09/21 20:48:34 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (27  times so far)
16/09/21 20:48:54 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (28  times so far)

检查 Spark UI 后,只有一个线程正在超时工作,而其余线程已经完成。
enter image description here
是否有可能将其分散到线程之间?

最佳答案

您解决此问题的方式在设计上会放大与数据倾斜相关的任何可能的问题。由于您首先按键和指示变量对数据重新排序,因此您首先对数据进行洗牌,可能会创建高度不平衡的分区。此后应用的任何减少都无法弥补这一点。

至少有两种方法可以用来实现相同的结果,同时充分受益于 map 边缩减。我在 SPARK DataFrame: select the first row of each group 的回答中解释了这两点所以重申一下:

  • 您可以使用struct排序来选择每组的最小/最大行。
  • 您可以将静态类型的DatasetgroupByKey结合使用,后跟reduceGroups

关于scala - 线程 # 将 _ GB 的排序数据溢出到磁盘,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39626429/

相关文章:

apache-spark - 是否有适合多节点集群部署的 Apache Spark 2.0.0 公共(public) Docker 镜像?

scala - Spark UDF 空处理

mongodb - 无法通过 Spark 连接到 Mongo DB

generics - 使用类型级计算时类型推断/类型检查失败

scala - @uncheckedVariance 在 Kotlin 中?

java - 如何使用 target/...jar 执行 scala 对象?

hadoop - RDD 拆分给出缺少的参数类型

scala - Keep组合的含义?

泛型类的 scala 宏泛型字段不适用类泛型类型参数

performance - 我应该把程序放在 HDFS 上吗?