scala - 我们如何对 Spark RDD 中的数据进行排序和分组?

标签 scala sorting apache-spark scala-collections rdd

data.csv 文件中的数据是:

07:36:00 PM 172.20.16.107   104.70.250.141  80  57188   0.48
07:33:00 PM 172.20.16.105   104.70.250.141  80  57188   0.66
07:34:00 PM 172.20.16.105   104.70.250.141  80  57188   0.47
07:35:00 PM 172.20.16.105   104.70.250.141  80  57188   0.48
07:44:00 PM 172.20.16.106   104.70.250.141  80  57188   0.49
07:45:00 PM 172.20.16.106   104.70.250.141  80  57188   0.48
07:46:00 PM 172.20.16.106   104.70.250.141  80  57188   0.33
07:47:00 PM 172.20.16.106   104.70.250.141  80  57188   0.48
07:48:00 PM 172.20.16.106   104.70.250.141  80  57188   0.48
07:36:00 PM 172.20.16.105   104.70.250.141  80  57188   0.48
07:37:00 PM 172.20.16.107   104.70.250.141  80  57188   0.48
07:37:00 PM 172.20.16.105   104.70.250.141  80  57188   0.66
07:38:00 PM 172.20.16.105   104.70.250.141  80  57188   0.47
07:39:00 PM 172.20.16.105   104.70.250.141  80  57188   0.48
07:50:00 PM 172.20.16.106   104.70.250.141  80  57188   0.49
07:51:00 PM 172.20.16.106   104.70.250.141  80  57188   0.48
07:52:00 PM 172.20.16.106   104.70.250.141  80  57188   0.33
07:53:00 PM 172.20.16.106   104.70.250.141  80  57188   0.48
07:54:00 PM 172.20.16.106   104.70.250.141  80  57188   0.48
07:40:00 PM 172.20.16.105   104.70.250.141  80  57188   0.48

这是我的代码:

 import org.apache.spark.SparkContext
 import org.apache.spark.SparkContext._

 object ScalaApp {
 def main(args: Array[String]) {
 val sc = new SparkContext("local[4]", "Program")

     // we take the raw data in CSV format and convert it into a

  val data = sc.textFile("data.csv")
 .map(line => line.split(","))

 .map(GroupRecord => (GroupRecord(0),
GroupRecord(1),GroupRecord(2),GroupRecord(3),GroupRecord(4),GroupRecord(5)))

val numPurchases = data.count()
val d1=data.groupByKey(GroupRecord(2)) // here is the error

println("No: " + numPurchases)
println("Grouped Data" + d1)

}
}

我只想要按源 IP(第二列)分组并按时间排序(第一列)的相同数据。 所以我需要的数据是:

  07:33:00 PM   172.20.16.105   104.70.250.141  80  57188   0.66
  07:34:00 PM   172.20.16.105   104.70.250.141  80  57188   0.47
  07:35:00 PM   172.20.16.105   104.70.250.141  80  57188   0.48
  07:37:00 PM   172.20.16.105   104.70.250.141  80  57188   0.66
  07:38:00 PM   172.20.16.105   104.70.250.141  80  57188   0.47
  07:39:00 PM   172.20.16.105   104.70.250.141  80  57188   0.48
  07:40:00 PM   172.20.16.105   104.70.250.141  80  57188   0.48
  07:44:00 PM   172.20.16.106   104.70.250.141  80  57188   0.49
  07:45:00 PM   172.20.16.106   104.70.250.141  80  57188   0.48
  07:46:00 PM   172.20.16.106   104.70.250.141  80  57188   0.33
  07:47:00 PM   172.20.16.106   104.70.250.141  80  57188   0.48
  07:50:00 PM   172.20.16.106   104.70.250.141  80  57188   0.49
  07:51:00 PM   172.20.16.106   104.70.250.141  80  57188   0.48
  07:52:00 PM   172.20.16.106   104.70.250.141  80  57188   0.33
  07:53:00 PM   172.20.16.106   104.70.250.141  80  57188   0.48
  07:54:00 PM   172.20.16.106   104.70.250.141  80  57188   0.48
  07:36:00 PM   172.20.16.107   104.70.250.141  80  57188   0.48
  07:37:00 PM   172.20.16.107   104.70.250.141  80  57188   0.48

但是我的代码有问题,所以请帮助我!

最佳答案

您的问题是您的第二个 map 创建了一个 Tuple6 而不是键值对,如果您想要执行 xxxByKey 操作,则需要键值对。如果您想按第二列进行分组,则应将 GroupRecord(1) 设置为您的键和其余值,然后调用 groupByKey,如下所示:

data
  .map(GroupRecord => (GroupRecord(1),(GroupRecord(0),GroupRecord(2),GroupRecord(3),GroupRecord(4),GroupRecord(5)))
  .groupByKey()

关于scala - 我们如何对 Spark RDD 中的数据进行排序和分组?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32839231/

相关文章:

python - 在 Apache Spark 中使用 pyspark 转置 Dataframe

regex - 如何将一个字符串与另一个之间有空格的字符串进行比较

java - 是否有类型良好的 Scala(或 Java)库来使用 JSON Web API?

c++ - c/c++ 获取添加到排序数组中的元素的索引

c++ - 冒泡排序功能在数组中途停止工作

apache-spark - 检查spark中矩阵每列中唯一值的数量

python - 如何向 pyspark 中的行添加值?

scala - Apache Spark Dataframe Groupby agg() 用于多列

斯卡拉测试 : Auto-skip tests that exceed timeout

javascript - JS - 在纯 JS 中一定数量后隐藏 div