python - PySpark DataFrame - 动态加入多列

标签 python apache-spark dataframe pyspark apache-spark-sql

假设我在 Spark 上有两个 DataFrame

firstdf = sqlContext.createDataFrame([{'firstdf-id':1,'firstdf-column1':2,'firstdf-column2':3,'firstdf-column3':4}, \
{'firstdf-id':2,'firstdf-column1':3,'firstdf-column2':4,'firstdf-column3':5}])

seconddf = sqlContext.createDataFrame([{'seconddf-id':1,'seconddf-column1':2,'seconddf-column2':4,'seconddf-column3':5}, \
{'seconddf-id':2,'seconddf-column1':6,'seconddf-column2':7,'seconddf-column3':8}])

现在我想通过多列(任何大于一的数字)加入它们

我拥有的是第一个 DataFrame 的列数组和第二个 DataFrame 的列数组,这些数组具有相同的大小,我想通过这些数组中指定的列加入。例如:
columnsFirstDf = ['firstdf-id', 'firstdf-column1']
columnsSecondDf = ['seconddf-id', 'seconddf-column1']

由于这些数组的大小可变,我不能使用这种方法:
from pyspark.sql.functions import *

firstdf.join(seconddf, \
    (col(columnsFirstDf[0]) == col(columnsSecondDf[0])) &
    (col(columnsFirstDf[1]) == col(columnsSecondDf[1])), \
    'inner'
)

有什么方法可以动态加入多个列?

最佳答案

为什么不使用简单的理解:

firstdf.join(
    seconddf, 
   [col(f) == col(s) for (f, s) in zip(columnsFirstDf, columnsSecondDf)], 
   "inner"
)

由于您使用了逻辑,因此提供不带 & 的条件列表就足够了。运算符(operator)。

关于python - PySpark DataFrame - 动态加入多列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39606589/

相关文章:

python - FastCGI、Lighttpd 和 Flask

Java+Spark wordCount 与 EMR

apache-spark - 为什么Spark的重新分区没有将数据平衡到分区中?

pandas - Python Pandas : if the data is NaN, 然后更改为 0,否则在数据框中更改为 1

python - 如何使用python根据数据框中的列元素更改文件夹中的文件名

r - 如何在R中逐行读取json文件?

python - Pydev:将标准输出发送到真实的(tty)终端

python - 在 Pandas 中混合数据帧

Python Kivy屏幕管理器 'AttributeError'

apache-spark - Spark 数据框列命名约定/限制