apache-spark - Spark - sortWithInPartitions 排序

标签 apache-spark apache-spark-sql spark-cassandra-connector apache-spark-dataset

下面是代表员工 in_date 和 out_date 的示例数据集。
我必须获得所有员工的最后一个 in_time。
Spark 在 4 Node 独立集群上运行。
初始数据集:
EmployeeID----in_date----out_date

1111111     2017-04-20  2017-09-14 
1111111     2017-11-02  null 
2222222     2017-09-26  2017-09-26 
2222222     2017-11-28  null 
3333333     2016-01-07  2016-01-20 
3333333     2017-10-25  null 
df.sort(col(in_date).desc()) 之后的数据集:
EmployeeID--in_date-----out_date
1111111   2017-11-02   null 
1111111   2017-04-20   2017-09-14 
2222222   2017-09-26   2017-09-26 
2222222   2017-11-28   null 
3333333   2017-10-25   null 
3333333   2016-01-07   2016-01-20 
df.dropDup(EmployeeID):  
输出 :
EmployeeID----in_date----out_date
1111111    2017-11-02    null 
2222222    2017-09-26    2017-09-26 
3333333    2016-01-07    2016-01-20 
预期数据集:
EmployeeID----in_date----out_date
1111111    2017-11-02   null 
2222222    2017-11-28   null 
3333333    2017-10-25   null 
但是当我用 sortWithInPartitions 对初始数据集进行排序时并删除了我得到了预期的数据集。
我在这里错过了什么大或小吗?任何帮助表示赞赏。
附加信息 :
在本地模式下使用 Spark 执行 df.sort 时,实现了上述预期输出。
我没有做过任何类型的分区,重新分区。
初始数据集来自底层 Cassandra 数据库。

最佳答案

TL;博士 除非明确保证,否则您永远不应假设 Spark 中的操作将以任何特定顺序执行,尤其是在使用 Spark SQL 时。

你在这里缺少的是洗牌。 dropDuplicates实现相当于:

df.groupBy(idCols).agg(first(c) for c in nonIdCols)

这将被执行为:
  • 部分(“ map 端”)聚合。
  • 洗牌。
  • 最终(“reduce-side”)聚合。

  • 中间 shuffle 引入了不确定性,并且不能保证最终聚合将以任何特定顺序应用。

    The above expected output was achieved when df.sort was executed with Spark in local mode.


    local模式相当简单。在完全分布式模式下,您永远不应该使用它来得出有关 Spark 内部行为的结论。

    when I sorted the Initial Dataset with sortWithInPartitions and deduped I got the expected dataset.



    如果数据之前由 EmployeeID 分区,这将是有意义的。 .在这种情况下,Spark 不需要额外的 shuffle。

    根据描述,您似乎应该使用 How to select the first row of each group? 中所示的解决方案之一。 .

    关于apache-spark - Spark - sortWithInPartitions 排序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47579128/

    相关文章:

    scala - 带有可变参数的Spark UDF

    hadoop - Hive是否可以定期将增量数据追加或插入到hdfs的同一表文件中?

    java - 在 RDD 中存储数组的有效方法

    apache-spark - Spark-Csv 写引用模式不起作用

    apache-spark - ORC 文件上的 Spark SQL 不返回正确的架构(列名)

    apache-spark - 如何将 cassandraRow 转换为 Row(apache spark)?

    apache-spark - Cassandra Spark Connector 版本与 Spark 2.2 冲突

    apache-spark - spark - 如何添加字段名称,当 spark 读取没有头的 csv 时

    mysql - 使用 Spark JDBC 创建新的 MySQL 表

    apache-spark - 从具有不同 TTL 的 Spark 在 Cassandra 上批量插入