scala - 如何在Spark中按键分区RDD?

标签 scala apache-spark rdd

鉴于 HashPartitioner 文档说:

[HashPartitioner] implements hash-based partitioning using Java's Object.hashCode.



说我要分区 DeviceData由其 kind .
case class DeviceData(kind: String, time: Long, data: String)

RDD[DeviceData] 进行分区是否正确?通过覆盖 deviceData.hashCode()方法并仅使用 kind 的哈希码?

但鉴于 HashPartitioner需要多个分区参数我很困惑我是否需要提前知道种类的数量以及如果种类多于分区会发生什么?

如果我将分区数据写入磁盘,它在读取时将保持分区状态是否正确?

我的目标是打电话
  deviceDataRdd.foreachPartition(d: Iterator[DeviceData] => ...)

并且只有DeviceData的相同 kind迭代器中的值。

最佳答案

做一个groupByKey怎么样?使用 kind .或者另一个 PairRDDFunctions方法。

您让我觉得您并不真正关心分区,只是在一个处理流程中获得所有特定类型?

配对函数允许:

rdd.keyBy(_.kind).partitionBy(new HashPartitioner(PARTITIONS))
   .foreachPartition(...)

但是,您可能会更安全一点:
rdd.keyBy(_.kind).reduceByKey(....)

mapValues或许多其他配对函数,可确保您将碎片作为一个整体

关于scala - 如何在Spark中按键分区RDD?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32544307/

相关文章:

java.io.InvalidClassException : org. apache.spark.internal.io.HadoopMapReduceCommitProtocol;本地类不兼容

scala - Spark : How to efficiently have intersections preserving duplicates (in Scala)?

python - Pyspark - 将字典列表(piplelinedRDD)扁平化为单个字典并按键按值分组

scala - 使用案例类显示表达式

scala - 如何动态创建列引用?

java - 为什么在 Java 中不需要创建这些 json 读/写?

apache-spark - Spark - 如何在生产中使用训练有素的推荐模型?

apache-spark - 与 aws-java-sdk 链接时读取 json 文件时 Spark 崩溃

eclipse - 构建路径和 scala-ide 中 scala lib 版本不同

apache-spark - 通过 thrift 服务器从 Web 浏览器访问 Spark RDDs - java