我正在使用 spark sql 对我的数据集运行查询。查询的结果很小,但仍然是分区的。
我想合并生成的 DataFrame 并按列对行进行排序。我试过
DataFrame result = sparkSQLContext.sql("my sql").coalesce(1).orderBy("col1")
result.toJSON().saveAsTextFile("output")
我也试过
DataFrame result = sparkSQLContext.sql("my sql").repartition(1).orderBy("col1")
result.toJSON().saveAsTextFile("output")
输出文件按块排序(即分区已排序,但数据帧未作为整体排序)。例如,代替
1, value
2, value
4, value
4, value
5, value
5, value
...
我得到
2, value
4, value
5, value
-----------> partition boundary
1, value
4, value
5, value
最佳答案
我想在这里提几件事。
1- 源代码显示 orderBy 语句在内部调用排序 api,全局排序设置为 true 。因此,输出级别缺少排序表明在写入目标时排序丢失。我的观点是对 orderBy 的调用总是需要全局顺序。
2- 使用激烈的合并,就像在你的情况下强制单个分区一样,可能真的很危险。我建议你不要这样做。源代码表明,调用 coalesce(1) 可能会导致上游转换使用单个分区。这将是残酷的表现。
3- 您似乎希望 orderBy 语句在单个分区中执行。我不认为我同意这种说法。这将使 Spark 成为一个非常愚蠢的分布式框架。
社区请让我知道您是否同意或不同意这些陈述。
无论如何,您如何从输出中收集数据?
也许输出实际上包含已排序的数据,但是您为了从输出中读取而执行的转换/操作是造成订单丢失的原因。
关于apache-spark - SparkSQL DataFrame 跨分区排序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31736519/