apache-spark - 当所有列都不在源中时,有没有办法在 Spark/Databricks 合并查询上返回 "set *"?

标签 apache-spark databricks delta-lake

我正在 Spark/Databricks 中将一个表中的数据合并到另一个表中。如果选择了所有列,我可以执行更新集*,但如果未选择所有列(例如,如果下面的查询中的 dst 表中有 col9),则此操作会失败。有没有办法进行此合并,而无需重复 when Matched (即按源查询中的列名称匹配)和 when notmatch 段中的长列列表?

merge into demo dst
using (
select distinct
  col1,
  col2,
  col3,
  col4,
  col5,
  col6,
  col7,
  col8
from
  demo_raw
where 1=1
  and col1 = 'ac'
  and col2 is not null
) src
on src.col1 = dst.col1
when matched then
  update set *
when not matched then
  insert *
;  

最佳答案

这是一个我比较满意的Python解决方案:

函数定义:

%python
def merge(srcTableName, dstTableName, whereStr, joinStr, cols, selectCols, debug):
  
  # create the sql string
  sqlString = ""
  sqlString += "merge into " + dstTableName + " t\n"
  sqlString += "using ( \n"
  sqlString += "select \n"
  for str in selectCols[:-1]:
    sqlString += "  " + str + ", \n"
  sqlString += "  " + selectCols[-1] + "\n"
  sqlString += "from " + srcTableName
  sqlString += whereStr + "\n"
  sqlString += ") s\n"
  sqlString += "on \n"
  sqlString += joinStr + "\n"
  sqlString += "when matched then update set \n"
  updateCols = [s + " = s." + s for s in cols]
  for str in updateCols[:-1]:
    sqlString += "  " + str + ", \n"
  sqlString += "  " + updateCols[-1]
  sqlString += """
  when not matched then 
  insert (
  """
  for str in cols[:-1]:
    sqlString += "  " + str + ", \n"
  sqlString += "  " + cols[-1]
  sqlString += """
  )
  values (
  """
  for str in cols[:-1]:
    sqlString += "  " + str + ", \n"
  sqlString += "  " + cols[-1] + "\n"
  sqlString += ")"

  if debug == True:
    print(sqlString)

  spark.sql(sqlString)
  return sqlString

调用示例:

# define the tables (FROM MY_PROJECT.DEMO TO DEMO)
srcTableName = "my_project.demo"
dstTableName = "demo"
# define the where clause (ONLY AC DATA)
whereStr = """
where 1=1
  and org = 'ac'
  and org_patient_id is not null
"""
# define the join (MATCH ON PATIENT_ID)
joinStr = "t.patient_id = s.patient_id"
# define the columns (JUST THESE COLUMNS)
cols = [
  'org',
  'data_lot',
  'raw_table',
  'org_patient_id',
  'patient_id',
  'sex',
  'sexual_orientation',
  'year_of_birth',
  'year_of_death',
  'race',
  'ethnicity',
  'city',
  'state',
  'zip'  
]
# create any needed aliases for query string
selectCols = cols
selectCols = [x if x != 'year_of_birth' else '(2020 - age) as year_of_birth' for x in selectCols]
# do the merge
merge(srcTableName, dstTableName, whereStr, joinStr, cols, selectCols)
print("Done.")

关于apache-spark - 当所有列都不在源中时,有没有办法在 Spark/Databricks 合并查询上返回 "set *"?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66505889/

相关文章:

java - 通过 Web 应用程序将 Apache spark 与 HDinsight 集群结合使用

matplotlib - 将Matplotlib输出保存到Databricks上的DBFS

python - 我们可以在 Databricks 工作区中存储 .whl 文件吗?

databricks - 如何在现有数据库之上创建具有只读访问权限的 databricks 数据库

apache-spark - 为什么我的 shuffle 分区在 group by 操作期间不是 200(默认)? ( Spark 2.4.5)

arrays - 从空数组创建类型化数组列

apache-spark - Databricks Pyspark - 组相关行

python - 增量湖增量 list 文件生成

Azure Databricks Delta 表与 Azure Synapse Lake 数据库表

python - 将 pyspark 数据帧中的多列转换为一个字典