scala - 如何在 Apache Spark 中执行 UPSERT 或 MERGE 操作?

标签 scala apache-spark

我正在尝试使用 Apache Spark 使用唯一列“ID”更新记录并将其插入旧 Dataframe。

最佳答案

为了更新Dataframe,您可以对唯一列执行“left_anti”连接,然后将其与包含新记录的Dataframe进行UNION

def refreshUnion(oldDS: Dataset[_], newDS: Dataset[_], usingColumns: Seq[String]): Dataset[_] = {
    val filteredNewDS = selectAndCastColumns(newDS, oldDS)
    oldDS.join(
      filteredNewDS,
      usingColumns,
      "left_anti")
      .select(oldDS.columns.map(columnName => col(columnName)): _*)
      .union(filteredNewDS.toDF)
  }

  def selectAndCastColumns(ds: Dataset[_], refDS: Dataset[_]): Dataset[_] = {
    val columns = ds.columns.toSet
    ds.select(refDS.columns.map(c => {
      if (!columns.contains(c)) {
        lit(null).cast(refDS.schema(c).dataType) as c
      } else {
        ds(c).cast(refDS.schema(c).dataType) as c
      }
    }): _*)
  }

val df = refreshUnion(oldDS, newDS, Seq("ID"))

关于scala - 如何在 Apache Spark 中执行 UPSERT 或 MERGE 操作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58797257/

相关文章:

scala - 如何在多个项目之间共享 sbt 插件配置?

scala - 带排序的参数化方法?

scala - 关于 Spark 的 RDD 的 take 和 takeOrdered 方法

scala - 在应用agg函数之前如何将十进制值限制为2位数?

scala - 可遍历的结构类型

java - sbt 有 M2_HOME 模拟吗?

scala - 本地依赖由 SBT 解决,但不是由 Play 解决的!框架

sql - 对象 sql 不是包 org.apache.spark 的成员

scala - Spark MLlib ALS 中的非整数 ID

apache-spark - 如何修复 NetworkWordCount Spark Streaming 应用程序中的 "org.apache.spark.shuffle.FetchFailedException: Failed to connect"?