我正在 Spark/Databricks 中将一个表中的数据合并到另一个表中。如果选择了所有列,我可以执行更新集*,但如果未选择所有列(例如,如果下面的查询中的 dst 表中有 col9),则此操作会失败。有没有办法进行此合并,而无需重复 when Matched
(即按源查询中的列名称匹配)和 when notmatch
段中的长列列表?
merge into demo dst
using (
select distinct
col1,
col2,
col3,
col4,
col5,
col6,
col7,
col8
from
demo_raw
where 1=1
and col1 = 'ac'
and col2 is not null
) src
on src.col1 = dst.col1
when matched then
update set *
when not matched then
insert *
;
最佳答案
这是一个我比较满意的Python解决方案:
函数定义:
%python
def merge(srcTableName, dstTableName, whereStr, joinStr, cols, selectCols, debug):
# create the sql string
sqlString = ""
sqlString += "merge into " + dstTableName + " t\n"
sqlString += "using ( \n"
sqlString += "select \n"
for str in selectCols[:-1]:
sqlString += " " + str + ", \n"
sqlString += " " + selectCols[-1] + "\n"
sqlString += "from " + srcTableName
sqlString += whereStr + "\n"
sqlString += ") s\n"
sqlString += "on \n"
sqlString += joinStr + "\n"
sqlString += "when matched then update set \n"
updateCols = [s + " = s." + s for s in cols]
for str in updateCols[:-1]:
sqlString += " " + str + ", \n"
sqlString += " " + updateCols[-1]
sqlString += """
when not matched then
insert (
"""
for str in cols[:-1]:
sqlString += " " + str + ", \n"
sqlString += " " + cols[-1]
sqlString += """
)
values (
"""
for str in cols[:-1]:
sqlString += " " + str + ", \n"
sqlString += " " + cols[-1] + "\n"
sqlString += ")"
if debug == True:
print(sqlString)
spark.sql(sqlString)
return sqlString
调用示例:
# define the tables (FROM MY_PROJECT.DEMO TO DEMO)
srcTableName = "my_project.demo"
dstTableName = "demo"
# define the where clause (ONLY AC DATA)
whereStr = """
where 1=1
and org = 'ac'
and org_patient_id is not null
"""
# define the join (MATCH ON PATIENT_ID)
joinStr = "t.patient_id = s.patient_id"
# define the columns (JUST THESE COLUMNS)
cols = [
'org',
'data_lot',
'raw_table',
'org_patient_id',
'patient_id',
'sex',
'sexual_orientation',
'year_of_birth',
'year_of_death',
'race',
'ethnicity',
'city',
'state',
'zip'
]
# create any needed aliases for query string
selectCols = cols
selectCols = [x if x != 'year_of_birth' else '(2020 - age) as year_of_birth' for x in selectCols]
# do the merge
merge(srcTableName, dstTableName, whereStr, joinStr, cols, selectCols)
print("Done.")
关于apache-spark - 当所有列都不在源中时,有没有办法在 Spark/Databricks 合并查询上返回 "set *"?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66505889/