java - 使用 Apache Spark 重新分区

标签 java scala hadoop apache-spark

问题:我正在尝试对数据集进行重新分区,以便在指定的整数列中具有相同编号的所有行都在同一分区中。

什么是有效的:当我将 1.6 API(在 Java 中)与 RDD 一起使用时,我使用了哈希分区程序,这按预期工作。例如,如果我为每一行打印此列的每个值的模数,我将在给定分区中获得相同的模数(我通过手动读取使用 saveAsHadoopFile 保存的内容来读取分区)。

使用最新的 API 时没有按预期工作

但现在我正在尝试使用 2.0.1 API(在 Scala 中)和具有重新分区方法的数据集,该方法采用多个分区和一列并将此数据集保存为 Parquet 文件。如果我在给定此列的行未分区的分区中查看结果是不一样的。

最佳答案

要保存分区的数据集,您可以使用:

  • DataFrameWriter.partitionBy - 自 Spark 1.6 起可用

    df.write.partitionBy("someColumn").format(...).save()
    
  • DataFrameWriter.bucketBy - 自 Spark 2.0 起可用

    df.write.bucketBy("someColumn").format(...).save()
    

使用 df.partitionBy("someColumn").write.format(...).save 应该也能正常工作,但 Dataset API 不使用哈希码。它使用 MurmurHash,因此结果将不同于 RDD API 中 HashParitioner 的结果,并且琐碎的检查(如您描述的检查)将不起作用。

val oldHashCode = udf((x: Long) => x.hashCode)

// https://github.com/apache/spark/blob/v2.0.1/core/src/main/scala/org/apache/spark/util/Utils.scala#L1596-L1599
val nonNegativeMode = udf((x: Int, mod: Int) => {
  val rawMod = x % mod
  rawMod + (if (rawMod < 0) mod else 0)
})

val df = spark.range(0, 10)

val oldPart = nonNegativeMode(oldHashCode($"id"), lit(3))
val newPart = nonNegativeMode(hash($"id"), lit(3))

df.select($"*", oldPart, newPart).show
+---+---------------+--------------------+
| id|UDF(UDF(id), 3)|UDF(hash(id, 42), 3)|
+---+---------------+--------------------+
|  0|              0|                   1|
|  1|              1|                   2|
|  2|              2|                   2|
|  3|              0|                   0|
|  4|              1|                   2|
|  5|              2|                   2|
|  6|              0|                   0|
|  7|              1|                   0|
|  8|              2|                   2|
|  9|              0|                   2|
+---+---------------+--------------------+

一个可能的陷阱是 DataFrame writer 可以合并多个小文件以降低成本,因此来自不同分区的数据可以放在一个文件中。

关于java - 使用 Apache Spark 重新分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40586642/

相关文章:

scala - 在 Scala 中存储特定类类型的序列?

macos - 在 Mac 上以伪分布式模式设置 Hadoop

hadoop - 使用Kafka Connect HDFS时,AccessControlException用户=根,访问=写…

java - 如何在 Java 中创建包含随机生成的数字的字符串列表?

java - Spring Boot + JPA + Hibernate CommandAcceptanceException : Error executing DDL

scala - 如何在 Scala 中重命名文件?

hadoop - 带 append 功能的 HDFS 是如何工作的

java - SWT - 如何更改 TabFolder 中现有 TabItems 的顺序

java - oracle 数据库中的连接未关闭

scala - 在 Scala 中以函数方式迭代表填充 2 个 HasSet