python - Synapse 工作区中的 PySpark Windows 函数(超前、滞后)

标签 python dataframe apache-spark pyspark apache-spark-sql

场景:

  • 工单具有 StartDateEndDate ,如果 StartDateEndDate 存在,则创建一个新的数据帧为显示在下面所需的输出中。

Pyspark 数据集如下所示

#base Schema for Testing purpose
#Dataset

from pyspark.sql.types import StructType,StructField, StringType, IntegerType

#Create User defined Custom Schema using StructType
schema = StructType([ StructField('CaseNumber', StringType(), True)\
                       ,StructField('StartTime', StringType(), True)\
                       ,StructField('EndTime', StringType(), True)])

data = [
        {"CaseNumber": 'Ticket1', "StartTime": '1/22/19 10:00', "EndTime": ''},
        {"CaseNumber": 'Ticket1', "StartTime": '', "EndTime": '1/23/19 11:00'},
        {"CaseNumber": 'Ticket1', "StartTime": '1/25/19 7:00', "EndTime": ''},
        {"CaseNumber": 'Ticket1', "StartTime": '1/27/19 3:00', "EndTime": ''},
        {"CaseNumber": 'Ticket2', "StartTime": '1/29/19 10:00', "EndTime": ''},
        {"CaseNumber": 'Ticket2', "StartTime": '', "EndTime": '2/23/19 2:00'},
        {"CaseNumber": 'Ticket2', "StartTime": '3/25/19 7:00', "EndTime": ''},
        {"CaseNumber": 'Ticket2', "StartTime": '', "EndTime": '3/27/19 8:00'},
        {"CaseNumber": 'Ticket2', "StartTime": '', "EndTime": '3/27/19 10:00'},
        {"CaseNumber": 'Ticket3', "StartTime": '4/25/19 1:00', "EndTime": ''}
        ]

from pyspark.sql import SparkSession
#Create PySpark SparkSession
spark = SparkSession.builder \
    .master('local[1]') \
    .appName('SparkByExamples.com') \
    .getOrCreate()

# Creation of a dummy dataframe:
df1 = spark.createDataFrame(data,schema=schema)

df1.show()

已创建数据集:

+----------+-------------+-------------+
|CaseNumber|    StartTime|      EndTime|
+----------+-------------+-------------+
|   Ticket1|1/22/19 10:00|          NaN|
|   Ticket1|          NaN|1/23/19 11:00|
|   Ticket1| 1/25/19 7:00|          NaN|
|   Ticket1| 1/27/19 3:00|          NaN|
|   Ticket2|1/29/19 10:00|          NaN|
|   Ticket2|          NaN| 2/23/19 2:00|
|   Ticket2| 3/25/19 7:00|          NaN|
|   Ticket2|          NaN| 3/27/19 8:00|
|   Ticket2|          NaN|3/27/19 10:00|
|   Ticket3| 4/25/19 1:00|          NaN|
+----------+-------------+-------------+

所需的输出应该是:

+----------+-------------+-------------+
|CaseNumber|    StartTime|      EndTime|
+----------+-------------+-------------+
|   Ticket1|1/22/19 10:00|1/23/19 11:00|
|   Ticket2|1/29/19 10:00| 2/23/19 2:00|
|   Ticket2| 3/25/19 7:00| 3/27/19 8:00|
+----------+-------------+-------------+

应用 Lead 函数来查看工单是否存在 endtime

from pyspark.sql.window import Window
import pyspark.sql.functions as psf

windowSpec = Window.partitionBy("CaseNumber").orderBy("CaseNumber")
df = df1.withColumn("lead",lead("EndTime",1).over(windowSpec))
df.show()

pysparkdf = df.toPandas()

import pandas as pd 
tickets = pysparkdf.groupby('CaseNumber')

def isLeadnull(e): 
    return e['lead'] != None

my_list = []
for i,ticket in tickets:
    for j,e in ticket.iterrows() :
        if  isLeadnull(e):
            my_list.append({'CaseNumber': e['CaseNumber'] ,'Start': e['StartTime'], 'EndTime': e['lead']})
        else:
            print(e['lead'],'Do nothing as condition not met')

此函数后的输出是:

[{'CaseNumber': 'Ticket1',
  'Start': '1/22/19 10:00',
  'EndTime': '1/23/19 11:00'},
 {'CaseNumber': 'Ticket1', 'Start': 'NaN', 'EndTime': 'NaN'},
 {'CaseNumber': 'Ticket1', 'Start': '1/25/19 7:00', 'EndTime': 'NaN'},
 {'CaseNumber': 'Ticket2',
  'Start': '1/29/19 10:00',
  'EndTime': '2/23/19 2:00'},
 {'CaseNumber': 'Ticket2', 'Start': 'NaN', 'EndTime': 'NaN'},
 {'CaseNumber': 'Ticket2', 'Start': '3/25/19 7:00', 'EndTime': '3/27/19 8:00'},
 {'CaseNumber': 'Ticket2', 'Start': 'NaN', 'EndTime': '3/27/19 10:00'}]

最佳答案

这是一种间隙和岛屿问题。您可以通过创建group列,使用条件累积和来识别“岛屿”,然后您可以按CaseNumber + group<进行分组 并聚合每个组的最大 StartTime 和最小 EndTime:

from pyspark.sql import functions as F, Window

# first, convert strings to timestamps and replacing empty strings with nulls
df1 = df1.withColumn("StartTime", F.to_timestamp("StartTime", "M/dd/yy H:mm")) \
    .withColumn("EndTime", F.to_timestamp("EndTime", "M/dd/yy H:mm")) \
    .replace("", None)

w = Window.partitionBy("CaseNumber").orderBy(F.coalesce("StartTime", "EndTime"))

df2 = df1.withColumn("group", F.sum(F.when(F.col("StartTime").isNotNull(), 1)).over(w)) \
    .groupBy("CaseNumber", "group") \
    .agg(F.max("StartTime").alias("StartTime"), F.min("EndTime").alias("EndTime")) \
    .filter(F.col("EndTime").isNotNull()) \
    .drop("group")

df2.show()
#+----------+-------------------+-------------------+
#|CaseNumber|          StartTime|            EndTime|
#+----------+-------------------+-------------------+
#|   Ticket1|2019-01-22 10:00:00|2019-01-23 11:00:00|
#|   Ticket2|2019-01-29 10:00:00|2019-02-23 02:00:00|
#|   Ticket2|2019-03-25 07:00:00|2019-03-27 08:00:00|
#+----------+-------------------+-------------------+

要理解逻辑,您可以在组之前逐步显示中间列:

df1.withColumn("group", F.sum(F.when(F.col("StartTime").isNotNull(), 1)).over(w)).show()

#+----------+-------------------+-------------------+-----+
#|CaseNumber|          StartTime|            EndTime|group|
#+----------+-------------------+-------------------+-----+
#|   Ticket1|2019-01-22 10:00:00|               null|    1|
#|   Ticket1|               null|2019-01-23 11:00:00|    1|
#|   Ticket1|2019-01-25 07:00:00|               null|    2|
#|   Ticket1|2019-01-27 03:00:00|               null|    3|
#|   Ticket2|2019-01-29 10:00:00|               null|    1|
#|   Ticket2|               null|2019-02-23 02:00:00|    1|
#|   Ticket2|2019-03-25 07:00:00|               null|    2|
#|   Ticket2|               null|2019-03-27 08:00:00|    2|
#|   Ticket2|               null|2019-03-27 10:00:00|    2|
#|   Ticket3|2019-04-25 01:00:00|               null|    1|
#+----------+-------------------+-------------------+-----+

关于python - Synapse 工作区中的 PySpark Windows 函数(超前、滞后),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70819127/

相关文章:

python - TypeError:在字符串格式化python期间并非所有参数都转换了

python - 如何对 groupby 结果的第一个值求和并将其写入相关组中的 df 列

scala - 将 n 个元素的 RDD 转换为单个元素的 RDD

python - 如何将 Panda 类型的数据转换为 Panda.Dataframe?

python - 在没有 map 的情况下替换 pandas 数据框中的多个值的优雅方法?

oracle - 使用 Spark 查询 Oracle DB 时出现 "ORA-00933: SQL command not properly ended"错误

scala - 使用 Apache Zeppelin 重新运行带有 -deprecation 的 Scala 代码

python - 如何在HBase中关闭WAL,

python - 在 python 子进程中使用 exec 查找命令会出错

python - 使用 Python 3.x 从网站中提取 JSON 数据