scala - 使用 Spark Scala 计算平均值

标签 scala apache-spark join

如何使用以下两个数据集计算 Spark Scala 中每个位置的平均工资?

File1.csv(第4列是工资)

Ram, 30, Engineer, 40000  
Bala, 27, Doctor, 30000  
Hari, 33, Engineer, 50000  
Siva, 35, Doctor, 60000

File2.csv(第 2 列是位置)

Hari, Bangalore  
Ram, Chennai  
Bala, Bangalore  
Siva, Chennai  

以上文件未排序。需要加入这两个文件并找到每个位置的平均工资。我尝试使用以下代码但无法成功。

val salary = sc.textFile("File1.csv").map(e => e.split(","))  
val location = sc.textFile("File2.csv").map(e.split(","))  
val joined = salary.map(e=>(e(0),e(3))).join(location.map(e=>(e(0),e(1)))  
val joinedData = joined.sortByKey()  
val finalData = joinedData.map(v => (v._1,v._2._1._1,v._2._2))  
val aggregatedDF = finalData.map(e=> e.groupby(e(2)).agg(avg(e(1))))    
aggregatedDF.repartition(1).saveAsTextFile("output.txt")  

请帮助提供代码和示例输出的外观。

非常感谢

最佳答案

您可以将 CSV 文件读取为 DataFrame,然后将它们加入并分组以获得平均值:

val df1 = spark.read.csv("/path/to/file1.csv").toDF(
  "name", "age", "title", "salary"
)

val df2 = spark.read.csv("/path/to/file2.csv").toDF(
  "name", "location"
)

import org.apache.spark.sql.functions._

val dfAverage = df1.join(df2, Seq("name")).
  groupBy(df2("location")).agg(avg(df1("salary")).as("average")).
  select("location", "average")

dfAverage.show
+-----------+-------+
|   location|average|
+-----------+-------+
|Bangalore  |40000.0|
|  Chennai  |50000.0|
+-----------+-------+

[更新]用于计算平均尺寸:

// file1.csv:
Ram,30,Engineer,40000,600*200
Bala,27,Doctor,30000,800*400
Hari,33,Engineer,50000,700*300
Siva,35,Doctor,60000,600*200

// file2.csv
Hari,Bangalore
Ram,Chennai
Bala,Bangalore
Siva,Chennai

val df1 = spark.read.csv("/path/to/file1.csv").toDF(
  "name", "age", "title", "salary", "dimensions"
)

val df2 = spark.read.csv("/path/to/file2.csv").toDF(
  "name", "location"
)

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType

val dfAverage = df1.join(df2, Seq("name")).
  groupBy(df2("location")).
  agg(
    avg(split(df1("dimensions"), ("\\*")).getItem(0).cast(IntegerType)).as("avg_length"),
    avg(split(df1("dimensions"), ("\\*")).getItem(1).cast(IntegerType)).as("avg_width")
  ).
  select(
    $"location", $"avg_length", $"avg_width",
    concat($"avg_length", lit("*"), $"avg_width").as("avg_dimensions")
  )

dfAverage.show
+---------+----------+---------+--------------+
| location|avg_length|avg_width|avg_dimensions|
+---------+----------+---------+--------------+
|Bangalore|     750.0|    350.0|   750.0*350.0|
|  Chennai|     600.0|    200.0|   600.0*200.0|
+---------+----------+---------+--------------+

关于scala - 使用 Spark Scala 计算平均值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44987336/

相关文章:

apache-spark - Spark Graphx 有像 Gephi 那样的可视化吗

scala - 构造函数应用程序列表

scala - 在 sbt 运行任务中重定向标准输入/标准输出

scala - Spark - 在不打开流的情况下获取 Kafka 的最早和最新偏移量

apache-spark - 是否可以嵌入 Zeppelin Notebook 的 HTML 输出,以便在托管 Notebook 的服务器不活动时可以查看输出?

php - MySQL : LEFT JOIN for 4 tables which are connected to each other

scala - Either模式匹配的替代方案

json - Scala Play框架 'update' json数组

mysql - 选择属性和最大值

mysql - 使用 MySQL 在其他表中搜索并求和