regex - 模式匹配-spark scala RDD

标签 regex scala apache-spark pattern-matching rdd

我是来自 R 背景的 Spark 和 Scala 新手。经过对 RDD 的一些转换,我得到了一个类型的 RDD

Description: RDD[(String, Int)]

现在我想在字符串 RDD 上应用正则表达式,并从字符串中提取子字符串,然后在新列中添加子字符串。

输入数据:

BMW 1er Model,278
MINI Cooper Model,248

我正在寻找的输出:

   Input                  |  Brand   | Series      
BMW 1er Model,278,          BMW ,        1er        
MINI Cooper Model ,248      MINI ,      Cooper

其中 Brand 和 Series 是从 String RDD 中新计算的子字符串

到目前为止我做了什么。

我可以使用正则表达式为字符串实现此目的,但我可以应用于所有行。

 val brandRegEx = """^.*[Bb][Mm][Ww]+|.[Mm][Ii][Nn][Ii]+.*$""".r //to look for BMW or MINI

然后我就可以使用

brandRegEx.findFirstIn("hello this mini is bmW testing")

但是我如何将它用于 RDD 的所有行并应用不同的正则表达式来实现上述输出。

我读到了这个代码片段,但不知道如何把它放在一起。

val brandRegEx = """^.*[Bb][Mm][Ww]+|.[Mm][Ii][Nn][Ii]+.*$""".r

def getBrand(Col4: String) : String = Col4 match {
    case brandRegEx(str)  =>  
    case _ => ""
    return 'substring
}

如有任何帮助,我们将不胜感激!

谢谢

最佳答案

要将正则表达式应用于 RDD 中的每个项目,您应该使用 RDD map 函数,该函数使用某个函数(在本例中是一个部分函数)来转换 RDD 中的每一行提取到组成每一行的元组的两个部分):

import org.apache.spark.{SparkContext, SparkConf}

object Example extends App {

  val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("Example"))

  val data = Seq(
    ("BMW 1er Model",278),
    ("MINI Cooper Model",248))

  val dataRDD = sc.parallelize(data)

  val processedRDD = dataRDD.map{
    case (inString, inInt) =>
      val brandRegEx = """^.*[Bb][Mm][Ww]+|.[Mm][Ii][Nn][Ii]+.*$""".r
      val brand = brandRegEx.findFirstIn(inString)
      //val seriesRegEx = ...
      //val series = seriesRegEx.findFirstIn(inString)
      val series = "foo"
      (inString, inInt, brand, series)
  }

  processedRDD.collect().foreach(println)
  sc.stop()
}

请注意,我认为您的正则表达式存在一些问题,并且您还需要一个正则表达式来查找该系列。此代码输出:

(BMW 1er Model,278,BMW,foo)
(MINI Cooper Model,248,NOT FOUND,foo)

但是,如果您根据需要更正正则表达式,则可以通过以下方式将它们应用到每一行。

关于regex - 模式匹配-spark scala RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34038904/

相关文章:

java - 在java中使用正则表达式捕获重复的数字组

scala - 有效地为 spark 实现 takeByKey

scala - 在sbt中同一个项目下添加插件

python - PySpark RDD 过滤掉的元素返回

mysql - Project_Bank.csv 不是 Parquet 文件。尾部预期的魔数(Magic Number) [80, 65, 82, 49] 但发现 [110, 111, 13, 10]

python - PySpark 中的聚合

python - 为什么要在 python 正则表达式中使用 re.purge()?

c# - 正则表达式与空格匹配

Python 2.6+ str.format() 和正则表达式

java - 将 Scalatest 包含在 Java 中时 Maven 构建失败