apache-spark - ReduceByKey 以字节数组为键

标签 apache-spark rdd

我想使用 RDD 对 Tuple2<byte[], obj> ,但是byte[]内容相同的s被认为是不同的值,因为它们的引用值不同。

我没有看到任何要传递给自定义比较器的东西。我可以转换 byte[]进入 String使用显式字符集,但我想知道是否有更有效的方法。

最佳答案

自定义比较器是不够的,因为 Spark 使用对象的 hashCode 来组织分区中的键。 (至少 HashPartitioner 会这样做,你可以提供一个可以处理数组的自定义分区器)

包装数组以提供适当的 equalshashCode 应该可以解决这个问题。 轻量级包装器应该可以解决问题:

class SerByteArr(val bytes: Array[Byte]) extends Serializable {
    override val hashCode = bytes.deep.hashCode
    override def equals(obj:Any) = obj.isInstanceOf[SerByteArr] && obj.asInstanceOf[SerByteArr].bytes.deep == this.bytes.deep
}

快速测试:

import scala.util.Random
val data = (1 to 100000).map(_ => Random.nextInt(100).toString.getBytes("UTF-8"))
val rdd = sparkContext.parallelize(data)
val byKey = rdd.keyBy(identity)
// this won't work b/c the partitioner does not support arrays as keys
val grouped = byKey.groupByKey
// org.apache.spark.SparkException: Default partitioner cannot partition array keys.

// let's use the wrapper instead   

val keyable = rdd.map(elem =>  new SerByteArr(elem))
val bySerKey = keyable.keyBy(identity)
val grouped = bySerKey.groupByKey
grouped.count
// res14: Long = 100

关于apache-spark - ReduceByKey 以字节数组为键,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30785615/

相关文章:

java - Spark-通过java代码提交

java - 带有 OpenJDK 11 且没有 Spring Context 的 AspectJ 1.9.4 无法作为 Spark 路由上的依赖模块工作

java - Java/Scala 中 Spark-csv 的时间戳解析

scala - 如何使用scala从spark中的RDD获取值

apache-spark - Spark : difference when read in . gz 和 .bz2

python - 在Spark中,RDD是不可变的,那么Accumulators是如何实现的呢?

json - 由于数据类型不匹配而获取 : argument 2 requires integral type error while parsing Json data Spark SQL

apache-spark - Spark Streaming - window() 是否缓存 DStreams?

caching - 持久化/缓存 RDD 上的 Spark RDD 检查点执行 DAG 两次

apache-spark - Spark RDD 是否会出现无法满足不变性的情况?