下面是我编写的用于比较两个数据帧并对它们施加交集函数的代码。
import os
from pyspark import SparkContext
sc = SparkContext("local", "Simple App")
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
df = sqlContext.read.format("jdbc").option("url","jdbc:sqlserver://xxx:xxx").option("databaseName","xxx").option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver").option("dbtable","xxx").option("user","xxxx").option("password","xxxx").load()
df.registerTempTable("test")
df1= sqlContext.sql("select * from test where amitesh<= 300")
df2= sqlContext.sql("select * from test where amitesh <= 400")
df3= df1.intersection(df2)
df3.show()
我收到以下错误:
AttributeError: 'DataFrame' object has no attribute 'intersection'
如果我的理解是正确的,intersection() 是从 python set 函数派生的内置子函数。所以,
1)如果我想在 pyspark 中使用它,我是否需要在我的代码中导入任何特殊模块,或者它也应该作为 pyspark 的内置模块工作?
2)要使用这个intersection()函数,我们首先需要将df转换为rdd吗?
请在我错的地方纠正我。有人可以给我一个工作的例子吗?
我的动机是从 SQL Server 获取公共(public)记录并转移到 HIVE。到目前为止,我首先尝试让我的交集函数工作,然后从 HIVE 要求开始,如果 intersection() 工作,我可以照顾。
最佳答案
我让它为我工作,而不是 intersect(),而不是 intersect(),它有效。
关于python - pyspark intersection() 函数比较数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48004584/