scala - scala Spark 中的 RDD 过滤器

标签 scala apache-spark

我有一个数据集,我想提取那些在 x 和 y 之间具有(评论/时间)的(评论/文本),例如( 1183334400 < time < 1185926400),

这是我的部分数据:

product/productId: B000278ADA
product/title: Jobst Ultrasheer 15-20 Knee-High Silky Beige Large
product/price: 46.34
review/userId: A17KXW1PCUAIIN
review/profileName: Mark Anthony "Mark"
review/helpfulness: 4/4
review/score: 5.0
review/time: 1174435200
review/summary: Jobst UltraSheer Knee High Stockings
review/text: Does a very good job of relieving fatigue.

product/productId: B000278ADB
product/title: Jobst Ultrasheer 15-20 Knee-High Silky Beige Large
product/price: 46.34
review/userId: A9Q3932GX4FX8
review/profileName: Trina Wehle
review/helpfulness: 1/1
review/score: 3.0
review/time: 1352505600
review/summary: Delivery was very long wait.....
review/text: It took almost 3 weeks to recieve the two pairs of stockings .

product/productId: B000278ADB
product/title: Jobst Ultrasheer 15-20 Knee-High Silky Beige Large
product/price: 46.34
review/userId: AUIZ1GNBTG5OB
review/profileName: dgodoy
review/helpfulness: 1/1
review/score: 2.0
review/time: 1287014400
review/summary: sizes recomended in the size chart are not real
review/text: sizes are much smaller than what is recomended in the chart. I tried to put it and sheer it!.

我的 Spark-Scala 代码:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.{SparkConf, SparkContext}

object test1 {
  def main(args: Array[String]): Unit = {
    val conf1 = new SparkConf().setAppName("golabi1").setMaster("local")
    val sc = new SparkContext(conf1)
    val conf: Configuration = new Configuration
    conf.set("textinputformat.record.delimiter", "product/title:")
    val input1=sc.newAPIHadoopFile("data/Electronics.txt",     classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)
    val lines = input1.map { text => text._2}
    val filt = lines.filter(text=>(text.toString.contains(tt => tt in (startdate until enddate))))
    filt.saveAsTextFile("data/filter1")
  }
}

但是我的代码不能很好地工作,

如何过滤这些行?

最佳答案

比这简单得多。试试这个:

object test1 
{
  def main(args: Array[String]): Unit = 
  {
    val conf1 = new SparkConf().setAppName("golabi1").setMaster("local")
    val sc = new SparkContext(conf1)

    def extractDateAndCompare(line: String): Boolean=
    {
        val from = line.indexOf("/time: ") + 7
        val to = line.indexOf("review/text: ") -1
        val date = line.substring(from, to).toLong
        date > startDate && date < endDate
    }

    sc.textFile("data/Electronics.txt")
        .filter(extractDateAndCompare)
        .saveAsTextFile("data/filter1")
  }
}

我通常会找到那些中间辅助方法来让事情变得更清晰。当然,这假设边界日期在某处定义并且输入文件包含格式问题。我这样做是为了保持简单,但添加一个 try、返回一个 Option 子句并使用 flatMap() 可以帮助您避免错误(如果有)。

此外,您的原始文本有点麻烦,您可能想探索 Json、TSV 文件或其他一些替代的、更简单的格式。

关于scala - scala Spark 中的 RDD 过滤器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29750325/

相关文章:

scala - 从可变长度 CSV 到对 RDD 的 Spark 转换

java - 使用 Scala-2.13 将 Java 映射转换为 Java 代码中的 Scala 不可变映射

python - 无法运行基本的 GraphFrames 示例

apache-spark - 加载 Hive 表时 Spark 创建了多少个分区

java - 当我尝试使用 java 从 Spark 中的 json 文件创建 View 时,为什么我得到的行包含空值

list - Scala:从列表[Tuple3]到映射[String,String]

Scala 隐式转换在某些条件下应用,但在其他条件下不应用

scala - 为什么像 "cons[B >: A](v: B)"这样定义的方法接受不是 A 父类(super class)型的类型参数?

hadoop - 尝试在Apache Spark中持久存储到数据库时RDD不起作用

python - 将 Pyspark RDD 拆分为不同的列并转换为 Dataframe