scala - 使用其他现有列添加新列 Spark/Scala

标签 scala dataframe apache-spark apache-spark-sql

我想使用其他现有列添加一个新列。这必须在有条件的情况下发布。这是我的数据框的示例:

val data = Seq(("WHT20177", "CTHT WO/MTR# : WHT20212/BTI0426; WHT20177/BTH0393"),
                ("WHT55637", "CTHT WO/MTR# : WHT50747/BTI2699; WHT55637/SQL1239"))

val dataFrame = data.toDF("prev_wo", "ref_wo")

+--------+-------------------------------------------------+
|prev_wo |ref_wo                                           |
+--------+-------------------------------------------------+
|WHT20177|CTHT WO/MTR# : WHT20212/BTI0426; WHT20177/BTH0393|
|WHT55637|CTHT WO/MTR# : WHT50747/BTI2699; WHT55637/SQL1239|
+--------+-------------------------------------------------+

列“ref_wo”必须包含“prev_wo”,仅在这种情况下,我必须将以下元素放入我将命名为“col1”的新列中。

1

对于第一行,要提取的值是"BTH0393",对于第二行,要提取的值是"SQL1239" 我正在使用两种不同的方法在 Spark Scala 中尝试这个。第一个只对第一行有反应,第二个只对第二行有反应。

第一种方法:

def addNewColumn(df: DataFrame): DataFrame = {

  val prev_wo = dataFrame.select("prev_wo").collectAsList().get(0).mkString(",")

  val regex_extract = ("(?<=" + prev_wo + "\\/)(.{7})").r

  df
    .withColumn("col1",
      when($"ref_wo".contains(col("prev_wo")),
        regexp_extract(col("ref_wo"), regex_extract.toString(), 1))
        .otherwise(null)
    )

}

val new_dataFrame = dataFrame
  .transform(addNewColumn)

输出:

+--------+-------------------------------------------------+-------+
|prev_wo |ref_wo                                           |col1   |
+--------+-------------------------------------------------+-------+
|WHT20177|CTHT WO/MTR# : WHT20212/BTI0426; WHT20177/BTH0393|BTH0393|
|WHT55637|CTHT WO/MTR# : WHT50747/BTI2699; WHT55637/SQL1239|       |
+--------+-------------------------------------------------+-------+

第二种方法:

def addColumn(df: DataFrame): DataFrame = {
  var out = df

  df.collect().foreach(row => {

    val prev_wo = row.getValuesMap(Seq("prev_wo")).get("prev_wo").getOrElse("")

    val regex_extract = ("(?<=" + prev_wo + "\\/)(.{7})").r

    out = out
      .withColumn("col1",
        when($"ref_wo".contains(col("prev_wo")),
          regexp_extract(col("ref_wo"), regex_extract.toString(), 1))
          .otherwise(null)
      )
  })

  out
}

val new_dataFrame = dataFrame
  .transform(addColumn)

输出

+--------+-------------------------------------------------+-------+
|prev_wo |ref_wo                                           |col1   |
+--------+-------------------------------------------------+-------+
|WHT20177|CTHT WO/MTR# : WHT20212/BTI0426; WHT20177/BTH0393|       |
|WHT55637|CTHT WO/MTR# : WHT50747/BTI2699; WHT55637/SQL1239|SQL1239|
+--------+-------------------------------------------------+-------+

最佳答案

您可以将 regexp_extract 与从 prev_wo 动态生成的模式一起使用:

dataFrame.withColumn("col1", expr("regexp_extract(ref_wo, concat(prev_wo, '/(.{7})'), 1)")).show(false)
+--------+-------------------------------------------------+-------+
|prev_wo |ref_wo                                           |col1   |
+--------+-------------------------------------------------+-------+
|WHT20177|CTHT WO/MTR# : WHT20212/BTI0426; WHT20177/BTH0393|BTH0393|
|WHT55637|CTHT WO/MTR# : WHT50747/BTI2699; WHT55637/SQL1239|SQL1239|
+--------+-------------------------------------------------+-------+

关于scala - 使用其他现有列添加新列 Spark/Scala,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67506301/

相关文章:

scala - 在 Spark 数据框中创建子字符串列

apache-spark - 如何将 JMXConsole 远程连接到 Spark 流应用程序

scala - 在 Spark Scala 中重命名 DataFrame 的列名称

scala - 错误 : org. apache.spark.rdd.RDD[(String,Int)] 不带参数

python - pandas 按分钟比较时区感知日期时间字段

python - 如何平整 Pandas 数据框中的每 n 行

scala - 如何将 Spark 的累加器传递给函数?

java - 了解集群状态更新

java - Apache Spark 和 scala,执行查询时出错

python - 从一个数据帧的列中获取唯一值,并使用它来过滤另一个数据帧中的行