python - 合并 Spark 数据框中的日期范围

标签 python apache-spark pyspark apache-spark-sql date-range

我有一个类似于 this one 的问题.

但是,我正在处理一个庞大的数据集。我想看看我是否可以在 PySpark 而不是 pandas 中做同样的事情。以下是 Pandas 的解决方案。这可以在 PySpark 中完成吗?

def merge_dates(grp):
    # Find contiguous date groups, and get the first/last start/end date for each group.
    dt_groups = (grp['StartDate'] != grp['EndDate'].shift()).cumsum()
    return grp.groupby(dt_groups).agg({'StartDate': 'first', 'EndDate': 'last'})

# Perform a groupby and apply the merge_dates function, followed by formatting.
df = df.groupby(['FruitID', 'FruitType']).apply(merge_dates)
df = df.reset_index().drop('level_2', axis=1) 

最佳答案

我们可以使用 Windowlag 函数来计算连续的组,然后以与 Pandas 函数类似的方式聚合它们共享。下面给出了一个工作示例,希望对您有所帮助!

import pandas as pd
from dateutil.parser import parse
from pyspark.sql.window import Window
import pyspark.sql.functions as F


# EXAMPLE DATA -----------------------------------------------

pdf = pd.DataFrame.from_items([('FruitID', [1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4]),
                                ('FruitType', ['Apple', 'Apple', 'Apple', 'Orange', 'Orange', 'Orange', 'Banana', 'Banana', 'Blueberry', 'Mango', 'Kiwi', 'Mango']),
                                ('StartDate', [parse(x) for x in ['2015-01-01', '2016-01-01', '2017-01-01', '2015-01-01', '2016-05-31',
                                                                  '2017-01-01', '2015-01-01', '2016-01-01', '2017-01-01', '2015-01-01', '2016-09-15', '2017-01-01']]),
                                ('EndDate', [parse(x) for x in ['2016-01-01', '2017-01-01', '2018-01-01', '2016-01-01', '2017-01-01',
                                                                '2018-01-01', '2016-01-01', '2017-01-01', '2018-01-01', '2016-01-01', '2017-01-01', '2018-01-01']])
                                ])

pdf.sort_values(['FruitID', 'StartDate'])
df = sqlContext.createDataFrame(pdf)


# FIND CONTIGUOUS GROUPS AND AGGREGATE ---------------------

w = Window.partitionBy("FruitType").orderBy("StartDate")
contiguous = F.when(F.datediff(F.lag("EndDate", 1).over(w),F.col("StartDate"))!=0,F.lit(1)).otherwise(F.lit(0))
df = (df
      .withColumn('contiguous_grp', F.sum(contiguous).over(w))
      .groupBy('FruitType','contiguous_grp')
      .agg(F.first('StartDate').alias('StartDate'),F.last('EndDate').alias('EndDate'))
      .drop('contiguous_grp'))
df.show()

输出:

+---------+-------------------+-------------------+
|FruitType|          StartDate|            EndDate|
+---------+-------------------+-------------------+
|   Orange|2015-01-01 00:00:00|2016-01-01 00:00:00|
|   Orange|2016-05-31 00:00:00|2018-01-01 00:00:00|
|   Banana|2015-01-01 00:00:00|2017-01-01 00:00:00|
|     Kiwi|2016-09-15 00:00:00|2017-01-01 00:00:00|
|    Mango|2015-01-01 00:00:00|2016-01-01 00:00:00|
|    Mango|2017-01-01 00:00:00|2018-01-01 00:00:00|
|    Apple|2015-01-01 00:00:00|2018-01-01 00:00:00|
|Blueberry|2017-01-01 00:00:00|2018-01-01 00:00:00|
+---------+-------------------+-------------------+

关于python - 合并 Spark 数据框中的日期范围,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54467666/

相关文章:

python - 在python中排序多维数组

python - 让 PostgreSQL 尊重输入参数的顺序?

python - pyspark map 类型包含重复键

python - 获取带有装饰器的函数的父类名

python - 如何阻止 gtk.Entry 失去对箭头按键的关注?

python - 从 Spark Dataframe 创建标记点以及如何将名称列表传递给 VectorAssembler

scala - mapreduce Job()抛出了 'java.lang.IllegalStateException'异常。无法评估org.apache.hadoop.mapreduce.Job.toString()

apache-spark - 发生错误时 Spark Streaming 关闭作业

elasticsearch - pyspark-将dstream写入elasticsearch时出错

linux - 尝试打印数据集表时出现问题