python - Pyspark DataFrame - 如何使用变量进行连接?

标签 python apache-spark dataframe pyspark apache-spark-sql

我在使用 Python 上的 Spark 数据帧连接两个数据帧时遇到了一些麻烦。我有两个数据框,我必须更改列的名称以使它们对于每个数据框都是唯一的,因此稍后我可以分辨出哪个列是哪个。我这样做是为了重命名列(firstDf 和 secondDf 是使用函数 createDataFrame 创建的 Spark DataFrames):

oldColumns = firstDf.schema.names
newColumns = list(map(lambda x: "{}.{}".format('firstDf', x), oldColumns))
firstDf = firstDf.toDF(*newColumns)

我对第二个 DataFrame 重复了这个。然后我尝试使用以下代码加入他们:

from pyspark.sql.functions import *

firstColumn = 'firstDf.firstColumn'
secondColumn = 'secondDf.firstColumn'
joinedDF = firstDf.join(secondDf, col(firstColumn) == col(secondColumn), 'inner')

像这样使用它会出现以下错误:

AnalysisException "cannot resolve 'firstDf.firstColumn' given input columns: [firstDf.firstColumn, ...];"

这只是为了说明该列存在于输入列数组中。

如果我不重命名 DataFrames 列,我可以使用这段代码加入它们:

joinedDf = firstDf.join(secondDf, firstDf.firstColumn == secondDf.firstColumn, 'inner')

但这给了我一个列名不明确的 DataFrame。

关于如何解决这个问题有什么想法吗?

最佳答案

一般来说,不要在名称中使用点。这些具有特殊含义(可用于确定表或访问 struct 字段)并且需要一些额外的工作才能正确识别。

对于等值连接,您只需要一个列名:

from pyspark.sql.functions import col

firstDf = spark.createDataFrame([(1, "foo")], ("firstColumn", "secondColumn"))
secondDf = spark.createDataFrame([(1, "foo")], ("firstColumn", "secondColumn"))

column = 'firstColumn'
firstDf.join(secondDf, [column], 'inner')

## DataFrame[firstColumn: bigint, secondColumn: string, secondColumn: string]

对于复杂的情况,使用表别名:

firstColumn = 'firstDf.firstColumn'
secondColumn = 'secondDf.firstColumn'

firstDf.alias("firstDf").join(
    secondDf.alias("secondDf"),
    # After alias prefix resolves to table name
    col(firstColumn) == col(secondColumn),
   "inner"
)

## DataFrame[firstColumn: bigint, secondColumn: string, firstColumn: bigint, secondColumn: string]

您也可以直接使用父框架:

column = 'firstColumn'

firstDf.join(secondDf, firstDf[column] == secondDf[column])

关于python - Pyspark DataFrame - 如何使用变量进行连接?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39583773/

相关文章:

python - 我应该安装 'MySQL-Python' 连接器中的哪一个?

python - 值错误 : invalid literal for int() with base 10 for non-digits

python-3.x - DataFrame 显示不符合预期

apache-spark - 如何将字符串冒号分隔的列转换为 MapType?

apache-spark - Spark sql 日期添加

python - Pandas - 列的唯一类型的相等出现

python - 如何计算数据框字段中字符串出现的次数?

python - 有什么方法可以将自定义/调试消息添加到 python/django unittest.TestCase 的失败测试方法的详细信息中?

python - 使用 PYODBC 连接到 SQL 服务器

scala - Spark 执行并行度不够的任务