apache-spark - 在 Spark SQL 中插入如果不存在 ELSE 更新

标签 apache-spark apache-spark-sql

是否有任何规定在 Spark SQL 中执行“INSERT IF NOT EXISTS ELSE UPDATE”。

我有包含一些记录的 Spark SQL 表“ABC”。
然后我有另一批记录,我想根据它们是否存在于该表中,在该表中插入/更新。

是否有我可以在 SQL 查询中使用的 SQL 命令来实现这一点?

最佳答案

在常规 Spark 中,这可以通过 join 实现后跟一个 map像这样:

import spark.implicits._
val df1 = spark.sparkContext.parallelize(List(("id1", "orginal"), ("id2", "original"))).toDF("df1_id", "df1_status")
val df2 = spark.sparkContext.parallelize(List(("id1", "new"), ("id3","new"))).toDF("df2_id", "df2_status")

val df3 = df1
  .join(df2, 'df1_id === 'df2_id, "outer")
  .map(row => {
    if (row.isNullAt(2))
      (row.getString(0), row.getString(1))
    else
      (row.getString(2), row.getString(3))
  })

这产生:
scala> df3.show
+---+--------+
| _1|      _2|
+---+--------+
|id3|     new| 
|id1|     new|
|id2|original|
+---+--------+

您也可以使用 selectudfs而不是 map ,但在这种带有空值的特殊情况下,我个人更喜欢 map变体。

关于apache-spark - 在 Spark SQL 中插入如果不存在 ELSE 更新,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45700834/

相关文章:

apache-spark - DataFrameReader 在读取 avro 文件时抛出 "Unsupported type NULL"

apache-spark - SparkSql 如果值为 null 则取前一个

apache-spark - 如何修复 'NoSuchMethodError: io.netty.buffer.PooledByteBufAllocator.defaultNumHeapArena() on EMR'

scala - Scala 和 Apache Spark 上的 csv 中的空值

apache-spark - Spark Sum和计数的性能问题

java - Spark 采样 - 比使用完整的 RDD/DataFrame 快多少

java - 从 Spark 中的压缩中读取整个文本文件

apache-spark - MLlib 的输入格式问题

list - 数据框列中的嵌套列表,提取数据框列中列表的值 Pyspark Spark

apache-spark - 在 Spark 2.4 中,Spark JDBC 是否允许将内置函数指定为 partitionColumn?