python - pyspark 方法只获取更新和新记录

标签 python hadoop pyspark

我正在使用 pyspark2.1 下面是我的数据帧

昨天数据DF

1,Nagraj,Keshav,2017-11-20 00:02:39.867000000,2017-11-20 00:02:39.867000000,20171120060038

2,Raghu,HR,2017-11-20 00:02:39.867000000,2017-11-20 00:02:39.867000000,20171120060038

今日数据DF

1,Nagraj,K,2017-11-21 00:02:39.867000000,2017-11-21 00:02:39.867000000

2,Raghu,HR,2017-11-21 00:02:39.867000000,2017-11-20 00:02:39.867000000

3,Ramya,Govindaraju,2017-11-21 00:02:39.867000000,2017-11-20 00:02:39.867000000

我的输出

1,Nagraj,K,2017-11-21 00:02:39.867000000,2017-11-20 00:02:39.867000000,20171120060038

3,Ramya,Govindaraju,2017-11-21 00:02:39.867000000,2017-11-20 00:02:39.867000000,20171120060038

我不应该获取两个数据框中都存在的记录,因为名称中只有第一条记录发生了变化,我应该获取这条记录,而第 3 条记录是新记录。

我使用了以下逻辑

df =today_data_df.select("id").subtract(yesterdata_data_df.select("id")).toDF('d1').join(today_data_df,col('d1')==today_data_df.id).drop('d1')

输出是:

3,Ramya,Govindaraju,2017-11-21 00:02:39.867000000,2017-11-20 00:02:39.867000000,20171120060038

但我应该得到下面给出的帮助

1,Nagraj,K,2017-11-21 00:02:39.867000000,2017-11-20 00:02:39.867000000,20171120060038 3,Ramya,Govindaraju,2017-11-21 00:02:39.867000000,2017-11-20 00:02:39.867000000,20171120060038

最佳答案

我假设有一个名称字段包含','

ydata=[(1,'Nagraj,Keshav','2017-11-20 00:02:39.867000000','2017-11-20 00:02:39.867000000',20171120060038),(2,'Raghu,HR','2017-11-20 00:02:39.867000000','2017-11-20 00:02:39.867000000',20171120060038)]
yschema=['id','name','fdate','tdate','stamp']
tdata=[(1,'Nagraj,K','2017-11-21 00:02:39.867000000','2017-11-21   00:02:39.867000000',20171120060038),(2,'Raghu,HR','2017-11-21 00:02:39.867000000','2017-11-20 00:02:39.867000000',20171120060038),(3,'Ramya,Govindaraju','2017-11-21 00:02:39.867000000','2017-11-20 00:02:39.867000000',20171120060038)]
ydf=spark.createDataFrame(ydata,yschema)
tdf=spark.createDataFrame(tdata,yschema)
newdf=tdf.select('id','name').subtract(ydf.select('id','name'))

newdf.join(tdf,newdf['id']==tdf['id']).drop(tdf['id']).drop(tdf['name']).show()

输出:

    | id|             name|               fdate|               tdate|           stamp|
    +---+-----------------+--------------------+--------------------+--------------+
    |  1|         Nagraj,K|2017-11-21 00:02:...|2017-11-21   00:02:...|20171120060038|
    |  3|Ramya,Govindaraju|2017-11-21 00:02:...|2017-11-20 00:02:...|20171120060038|

关于python - pyspark 方法只获取更新和新记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47426943/

相关文章:

hadoop - 配置单元导入和配置单元覆盖sqoop导入全部

python - java.io.IOException : Cannot run program "python" using Spark in Pycharm (Windows) 异常

Python自定义模块和导入

python - 提取井号之间的数据

scala - 如何使用循环在 Spark-Scala 的 HDFS 中迭代多个文本文件?

python - 如何序列化 pyspark 管道对象?

python - Spark 2.0 toPandas 方法

python - 如何在 PySpark 中创建 merge_asof 功能?

python - 用 tkinter 选择几个目录

configuration - 使用 System.setProperty() 覆盖 Hadoop 最终属性