apache-spark - Spark groupBy vs repartition 加 mapPartitions

标签 apache-spark apache-spark-sql apache-spark-dataset

我的数据集大约有 2000 万行,需要大约 8 GB 的 RAM。我正在使用 2 个执行程序运行我的工作,每个执行程序 10 GB RAM,每个执行程序 2 个内核。由于进一步的转换,数据应该一次性缓存。

我需要根据 4 个字段减少重复项(选择任何重复项)。两个选项:使用 groupBy 以及使用 repartitionmapPartitions。第二种方法允许您指定分区数,因此在某些情况下可以执行得更快,对吧?

您能否解释一下哪个选项的性能更好?这两个选项是否具有相同的 RAM 消耗?

使用 groupBy

dataSet
    .groupBy(col1, col2, col3, col4)
    .agg(
        last(col5),
        ...
        last(col17)
    );

使用 repartitionmapPartitions

dataSet.sqlContext().createDataFrame(
    dataSet
        .repartition(parallelism, seq(asList(col1, col2, col3, col4)))
        .toJavaRDD()
        .mapPartitions(DatasetOps::reduce),
    SCHEMA
);

private static Iterator<Row> reduce(Iterator<Row> itr) {
    Comparator<Row> comparator = (row1, row2) -> Comparator
        .comparing((Row r) -> r.getAs(name(col1)))
        .thenComparing((Row r) -> r.getAs(name(col2)))
        .thenComparingInt((Row r) -> r.getAs(name(col3)))
        .thenComparingInt((Row r) -> r.getAs(name(col4)))
        .compare(row1, row2);

    List<Row> list = StreamSupport
        .stream(Spliterators.spliteratorUnknownSize(itr, Spliterator.ORDERED), false)
        .collect(collectingAndThen(toCollection(() -> new TreeSet<>(comparator)), ArrayList::new));

    return list.iterator();
}

最佳答案

The second approach allows you to specify num of partitions, and could perform faster because of this in some cases, right?

不是真的。这两种方法都允许您指定分区数 - 在第一种情况下通过 spark.sql.shuffle.partitions

spark.conf.set("spark.sql.shuffle.partitions", parallelism)

然而,如果重复很常见,第二种方法本质上效率较低,因为它先洗牌,然后减少,跳过 map-side 减少(换句话说,它是另一种 gr​​oup-by-key)。如果重复很少见,这不会有太大区别。

旁注 Dataset 已经提供了 dropDuplicates variants ,它采用一组列,first/last 在这里没有特别的意义(参见 How to select the first row of each group? 中的讨论)。

关于apache-spark - Spark groupBy vs repartition 加 mapPartitions,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54216747/

相关文章:

apache-spark - 为什么我的任务成功率高于 Spark UI 中的任务总数?

apache-spark - 当下一个 RDD 物化时,前一个 RDD 会发生什么?

apache-spark - Spark : Prevent shuffle/exchange when joining two identically partitioned dataframes

hadoop - s3 上的 Spark 数据集 Parquet 分区创建临时文件夹

java - 如何在 Java 中将 DataFrame 转换为 Apache Spark 中的数据集?

java - Spark CSV - 找不到实际参数的适用构造函数/方法

scala - Spark File Streaming 获取文件名

apache-spark - 如何使用 Helm 图表在K8S上产生 Spark

apache-spark - Spark DataFrames 中的 argmax : how to retrieve the row with the maximum value

apache-spark - 分析异常 : u'Cannot resolve column name