scala - 根据 Spark 结构化流中的多个条件更新其他列的列值

标签 scala apache-spark-sql spark-streaming multiple-conditions

我想根据多个条件使用另外两列更新一列中的值。例如 - 流就像:

    +---+---+----+---+
    | A | B | C  | D |
    +---+---+----+---+
    | a | T | 10 | 0 |
    | a | T | 100| 0 |
    | a | L | 0  | 0 |
    | a | L | 1  | 0 |
    +---+---+----+---+

我有多个条件,例如 -

(B = "T" && C > 20 ) OR (B = "L" && C = 0)

“T”20“L”0是动态的。 AND/OR 运算符也在运行时提供。我希望只要条件成立,就使 D = 1 成立,否则应保持 D = 0。条件的数量也是动态的。

我尝试将它与 spark-sql 中的 UPDATE 命令一起使用,即 UPDATE df SET D = '1' WHERE CONDITIONS。但它说尚不支持更新。生成的数据框应该是 -

+---+---+----+---+
| A | B | C  | D |
+---+---+----+---+
| a | T | 10 | 0 |
| a | T | 100| 1 |
| a | L | 0  | 1 |
| a | L | 1  | 0 |
+---+---+----+---+

有什么办法可以实现这个目标吗?

最佳答案

我希望你正在使用Python。也会为 Scala 发布同样的内容!使用udf

PYTHON

>>> df.show()
+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
|  a|  T| 10|  0|
|  a|  T|100|  0|
|  a|  L|  0|  0|
|  a|  L|  1|  0|
+---+---+---+---+

>>> def get_column(B, C):
...     return int((B == "T" and C > 20) or (B == "L" and C == 0))
...
>>> fun = udf(get_column)
>>> res = df.withColumn("D", fun(df['B'], df['C']))>>> res.show()
+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
|  a|  T| 10|  0|
|  a|  T|100|  1|
|  a|  L|  0|  1|
|  a|  L|  1|  0|
+---+---+---+---+

SCALA

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> df.show()
+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
|  a|  T| 10|  0|
|  a|  T|100|  0|
|  a|  L|  0|  0|
|  a|  L|  1|  0|
+---+---+---+---+


scala> def get_column(B : String, C : Int) : Int = {     
     |     if((B == "T" && C > 20) || (B == "L" && C == 0))
     |         1     
     |     else
     |         0
     | }
get_column: (B: String, C: Int)Int

scala> val fun = udf(get_column _)
fun: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,IntegerType,Some(List(StringType, IntegerType)
))

scala> val res = df.withColumn("D", fun(df("B"), df("C")))
res: org.apache.spark.sql.DataFrame = [A: string, B: string ... 2 more fields]

scala> res.show()
+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
|  a|  T| 10|  0|
|  a|  T|100|  1|
|  a|  L|  0|  1|
|  a|  L|  1|  0|
+---+---+---+---+

您还可以使用 case whenotherwise,如下所示:

PYTHON

>>> df.show()
+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
|  a|  T| 10|  0|
|  a|  T|100|  0|
|  a|  L|  0|  0|
|  a|  L|  1|  0|
+---+---+---+---+

>>> new_column = when(
        (col("B") == "T") & (col("C") > 20), 1
    ).when((col("B") == "L") & (col("C") == 0), 1).otherwise(0)

>>> res = df.withColumn("D", new_column)
>>> res.show()
+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
|  a|  T| 10|  0|
|  a|  T|100|  1|
|  a|  L|  0|  1|
|  a|  L|  1|  0|
+---+---+---+---+

SCALA

scala> df.show()
+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
|  a|  T| 10|  0|
|  a|  T|100|  0|
|  a|  L|  0|  0|
|  a|  L|  1|  0|
+---+---+---+---+

scala> val new_column = when(
     |     col("B") === "T" && col("C") > 20, 1
     | ).when(col("B") === "L" && col("C") === 0, 1 ).otherwise(0)

new_column: org.apache.spark.sql.Column = CASE WHEN ((B = T) AND (C > 20)) THEN 1 WHEN ((B = L) AND (C = 0)) THEN 1 ELSE 0 END

scala> df.withColumn("D", new_column).show()
+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
|  a|  T| 10|  0|
|  a|  T|100|  1|
|  a|  L|  0|  1|
|  a|  L|  1|  0|
+---+---+---+---+

关于scala - 根据 Spark 结构化流中的多个条件更新其他列的列值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51188970/

相关文章:

Java Logger 向控制台生成多个相同的日志

scala - 如何从spark将文件写入cassandra

apache-spark - PySpark - 在不使用 UDF 或 join 的情况下创建多个聚合映射列

java - 在 Spark 中为 Dataframe 构建选择子句

scala - 为什么 Scala 不允许在函数类型定义中使用参数名称?

scala 泛型和继承

apache-spark - Spark结构化流作业在群集模式下失败

Python - 将整数或字符串发送到 Spark-Streaming

java - KafkaConsumer 在轮询时进入无限期等待状态

apache-spark - PySpark - 运行进程