python - pyspark intersection() 函数比较数据帧

标签 python hadoop pyspark pyspark-sql

下面是我编写的用于比较两个数据帧并对它们施加交集函数的代码。

import os
from pyspark import SparkContext
sc = SparkContext("local", "Simple App")
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)

df = sqlContext.read.format("jdbc").option("url","jdbc:sqlserver://xxx:xxx").option("databaseName","xxx").option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver").option("dbtable","xxx").option("user","xxxx").option("password","xxxx").load()

df.registerTempTable("test")

df1= sqlContext.sql("select * from test where amitesh<= 300")
df2= sqlContext.sql("select * from test where amitesh <= 400")

df3= df1.intersection(df2)
df3.show()

我收到以下错误:
AttributeError: 'DataFrame' object has no attribute 'intersection'

如果我的理解是正确的,intersection() 是从 python set 函数派生的内置子函数。所以,

1)如果我想在 pyspark 中使用它,我是否需要在我的代码中导入任何特殊模块,或者它也应该作为 pyspark 的内置模块工作?

2)要使用这个intersection()函数,我们首先需要将df转换为rdd吗?

请在我错的地方纠正我。有人可以给我一个工作的例子吗?

我的动机是从 SQL Server 获取公共(public)记录并转移到 HIVE。到目前为止,我首先尝试让我的交集函数工作,然后从 HIVE 要求开始,如果 intersection() 工作,我可以照顾。

最佳答案

我让它为我工作,而不是 intersect(),而不是 intersect(),它有效。

关于python - pyspark intersection() 函数比较数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48004584/

相关文章:

python - 我可以在没有 android 和没有 tensorboard 的情况下构建 tensorflow 吗?

hadoop - Flume 和远程 HDFS 接收器错误

mysql - 配置单元.HiveImport : FAILED: SemanticException [Error 10072]: Database does not exist:

scala - 如何在Spark中顺序处理两个RDD?

Python datetime.strptime 月份说明符似乎不起作用

python - 如何修复 python 3.4 tkinter "Index Error: list index out of range"中的此错误

pyspark - 使用 python 序列化自定义转换器以在 Pyspark ML 管道中使用

pyspark - Hyperopt 无法使用跟踪 URI : databricks 执行 mlflow.end_run()

python - PySpark 将数据帧中数组中的元素映射到另一个数据帧

python - 需要一个脚本来遍历文件并执行命令