scala - 如何计算由 Spark 中的 (Key, [Value]) 对组成的 RDD 中每对的平均值?

标签 scala apache-spark

我对 Scala 和 Spark 都很陌生,所以如果我完全错误地解决了这个问题,请原谅我。导入一个csv文件,过滤,映射后;我有一个 RDD,它是一堆 (String, Double) 对。

(b2aff711,-0.00510)
(ae095138,0.20321)
(etc.)

当我在 RDD 上使用 .groupByKey() 时,
val grouped = rdd1.groupByKey()

获得带有一堆 (String, [Double]) 对的 RDD。 (我不知道 CompactBuffer 是什么意思,也许会导致我的问题?)
(32540b03,CompactBuffer(-0.00699, 0.256023))
(a93dec11,CompactBuffer(0.00624))
(32cc6532,CompactBuffer(0.02337, -0.05223, -0.03591))
(etc.)

一旦它们被分组,我就会尝试取平均值和标准偏差。我想简单地使用 .mean() 和 .sampleStdev()。当我尝试创建一个新的 RDD 方法时,
val mean = grouped.mean()

返回错误

Error:(51, 22) value mean is not a member of org.apache.spark.rdd.RDD[(String, Iterable[Double])]

val mean = grouped.mean( )



我已经导入了 org.apache.spark.SparkContext._
我还尝试使用 sampleStdev( )、.sum( )、.stats( ) 获得相同的结果。不管是什么问题,它似乎都影响了所有的数字 RDD 操作。

最佳答案

让我们考虑以下几点:

val data = List(("32540b03",-0.00699), ("a93dec11",0.00624),
                ("32cc6532",0.02337) , ("32540b03",0.256023),
                ("32cc6532",-0.03591),("32cc6532",-0.03591))

val rdd = sc.parallelize(data.toSeq).groupByKey().sortByKey()

计算每对均值的一种方法如下:

您需要定义一个平均方法:
def average[T]( ts: Iterable[T] )( implicit num: Numeric[T] ) = {
   num.toDouble( ts.sum ) / ts.size
}

您可以在 rdd 上应用您的方法,如下所示:
val avgs = rdd.map(x => (x._1, average(x._2)))

您可以检查:
avgs.take(3)

这是结果:
res4: Array[(String, Double)] = Array((32540b03,0.1245165), (32cc6532,-0.016149999999999998), (a93dec11,0.00624))

关于scala - 如何计算由 Spark 中的 (Key, [Value]) 对组成的 RDD 中每对的平均值?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30920057/

相关文章:

scala - 在 Slick 中创建一个空查询

java - Kafka - 关闭(kafka.server.KafkaServer),启动 Kafka-Server-Start 时出现问题

apache-spark - 什么时候应该在 Spark 编程中使用 groupByKey API?

performance - spark.sql.shuffle.partitions 和 spark.default.parallelism 有什么区别?

hadoop - 如何在Spark SQL中处理 'NULL'值?

scala - Spark : Why execution is carried by a master node but not worker nodes?

scala - 为什么cats在评估Reader时返回 `Id[T]`?

scala - 从 Scala 中的列表返回一个元素

apache-spark - Spark 流 : long queued/active batches

apache-spark - Spark on Databricks - 缓存 Hive 表