session - pyspark 按用户标识计算 session 持续时间组

标签 session hadoop apache-spark pyspark

我正在尝试使用 pyspark 计算事件中每个用户 ID 的 session 持续时间,数据示例如下:

diff_session.show(8,False):

|userid|platform            |previousTime           |currentTime            |timeDifference |
|1234  |13                  |null                   |2017-07-20 10:49:30.027|null           |
|1234  |13                  |null                   |2017-07-20 10:04:23.1  |null           |
|1234  |13                  |2017-07-20 10:04:23.1  |2017-07-20 10:06:23.897|120            |
|1234  |13                  |2017-07-20 10:04:23.897|2017-07-20 10:40:29.472|2166           |
|1234  |13                  |2017-07-20 10:40:29.472|2017-07-20 10:40:50.347|11             |
|1234  |13                  |2017-07-20 10:40:30.347|2017-07-20 10:51:16.458|646            |
|1234  |13                  |2017-07-20 10:51:16.458|2017-07-20 10:51:17.427|1              |
  1. 我想按用户ID和平台分组
  2. 然后我想在该组中设置 currentTime == previousTime(如果 timeDifference > 2000 或 timeDifference == null),我在下面尝试过:

    from pyspark.sql import SQLContext, functions
    
    df_session.select(df_session.userid, df_session.platform, functions.when(time_difference > 2000) THEN previousTime).otherwise(currentTime)
    
    df_session.select(df_session.userid, df_session.platform, functions.when(time_difference is null) THEN currentTime).otherwise(previousTime)
    
  3. 然后我想将所有小于 2000 的时间差相加,并使当前时间加上总时间差。所以结果会是这样的:

    |userid|platform            |previousTime           |currentTime            |timeDifference |
    |1234  |13                  |2017-07-20 10:49:30.027|2017-07-20 10:49:30.027|0              |
    |1234  |13                  |2017-07-20 10:04:23.1  |2017-07-20 10:04:23.1  |0              |
    |1234  |13                  |2017-07-20 10:04:23.1  |2017-07-20 10:06:23.897|120            |
    |1234  |13                  |2017-07-20 10:04:23.897|2017-07-20 10:04:23.897|0              |
    |1234  |13                  |2017-07-20 10:40:29.472|2017-07-20 10:51:17.427|658            |
    

最后一部分非常棘手,我还不知道从哪里开始。谢谢你。

最佳答案

希望这对您有所帮助!

import pyspark.sql.functions as func
from datetime import datetime, timedelta
from pyspark.sql.types import StringType

df = sc.parallelize([('1234','13','','2017-07-20 10:49:30.027',''),
                    ('1234','13','','2017-07-20 10:04:23.100',''),
                    ('1234','13','2017-07-20 10:04:23.100','2017-07-20 10:06:23.897',120),
                    ('1234','13','2017-07-20 10:04:23.897','2017-07-20 10:40:29.472',2166),
                    ('1234','13','2017-07-20 10:40:29.472','2017-07-20 10:40:50.347',11),
                    ('1234','13','2017-07-20 10:40:30.347','2017-07-20 10:51:16.458',646),
                    ('1234','13','2017-07-20 10:51:16.458','2017-07-20 10:51:17.427',1),
                    ('7777','44','2017-07-20 10:31:16.458','2017-07-20 10:47:16.458',1000),
                    ('7777','44','2017-07-20 11:11:16.458','2017-07-20 11:36:16.458',1500),
                    ('678','56','2017-07-20 10:51:16.458','2017-07-20 10:51:36.458',20),
                    ('678','56','2017-07-20 10:51:16.458','2017-07-20 10:51:26.458',10)
                    ]).\
    toDF(['userid','platform','previousTime','currentTime','timeDifference'])
df.show()

# missing value & outlier treatment
df1 = df.select("userid","platform", func.when(df.timeDifference=='', df.currentTime).otherwise(df.previousTime),
                func.when(df.timeDifference > 2000, df.previousTime).otherwise(df.currentTime),
                func.when(df.timeDifference=='', 0).when(df.timeDifference > 2000, 0).otherwise(df.timeDifference))
oldColumns = df1.schema.names
newColumns = ["userid", "platform", "previousTime", "currentTime", "timeDifference"]
df1 = reduce(lambda df1, idx: df1.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), df1)
df1.show()

# first part of result i.e. records where timeDifference = 0
df_final_part0 = df1.where("timeDifference = 0")

# identify records where sum(timeDifference) < 2000
df2 = df1.where("timeDifference <> 0")
df3 = df2.groupby("userid","platform").agg(func.sum("timeDifference")).\
    withColumnRenamed("sum(timeDifference)", "sum_timeDifference").where("sum_timeDifference < 2000")

# second part of result i.e. records where sum(timeDifference) is >= 2000
df_final_part1 = df2.join(df3, ["userid","platform"],"leftanti")

# third part of result
df_final_part2 = df2.join(df3,on=['userid','platform']).select('userid','platform',"previousTime","sum_timeDifference").\
    groupBy('userid','platform',"sum_timeDifference").agg(func.min("previousTime")).\
    withColumnRenamed("min(previousTime)", "previousTime").withColumnRenamed("sum_timeDifference", "timeDifference")
def processdate(x, time_in_sec):
    x = datetime.strptime(x, '%Y-%m-%d %H:%M:%S.%f')
    x += timedelta(milliseconds= time_in_sec * 1e3)
    return x.strftime('%Y-%m-%d %H:%M:%S.%f')
f1 = func.udf(processdate,StringType())
df_final_part2 = df_final_part2.withColumn("currentTime",f1(df_final_part2.previousTime,df_final_part2.timeDifference)).\
    select('userid','platform',"previousTime","currentTime","timeDifference")

# combine all three parts to get the final result
result = df_final_part0.unionAll(df_final_part1).unionAll(df_final_part2)
result.show()


如果它解决了您的问题,请不要忘记告诉我们:)

关于session - pyspark 按用户标识计算 session 持续时间组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45357833/

相关文章:

python - 如何在 python (Django) 中取消设置(删除)POST 变量

macos - 在Mac OS的Hive中创建表时失败,错误localhost:9000在连接时失败

java - 如何使用Scala(Spark)API从HBase读取列名称和值?

apache-spark - Spark : use the global config variables in executors

pandas - 在 zeppelin 中将 pandas 数据帧转换为 spark 数据帧

Spring MVC : HTTP session management "equivalent"

java - Tomcat:为 1 个请求多次设置 cookie 值?

session - Magento 在观察者中重新计算购物车总数

java - 如何在 hadoop-env.sh 中为 hadoop 守护进程单独定义内存

apache - Solr 索引从 1.4.1 升级到 5.2.1