python - 按时间间隔按 Pyspark Dataframe 分组

标签 python pyspark

我有一个为其生成了时间戳的数据框:

 from pyspark.sql.functions import avg, first

 rdd = sc.parallelize(
[
    (0, "A", 223,"201603_170302", "PORT"), 
    (0, "A", 22,"201602_100302", "PORT"), 
    (0, "A", 422,"201601_114300", "DOCK"), 
    (1,"B", 3213,"201602_121302", "DOCK")
]
)
 df_data = sqlContext.createDataFrame(rdd, ["id","type", "cost", "date", "ship"])

所以我可以生成一个datetime:

 dt_parse = udf(lambda x: datetime.strptime(x,"%Y%m%d_%H%M%S")
 df_data = df_data.withColumn('datetime', dt_parse(df_data.date))

但现在我需要每天按 6 小时的间隔进行分组。每小时大概是

 df_data.groupby(hour(df_data.datetime)).agg(count(ship).alias(ship)).show()

但这不适用于小时以外的其他时间间隔。有办法吗?

最佳答案

这对我有用。

import pyspark.sql.functions

# ...

interval = 60 * 60 * 6    # 6 hours
gdf = dataframe.withColumn(
    'time_interval',
    pyspark.sql.functions.from_unixtime(pyspark.sql.functions.floor(pyspark.sql.functions.unix_timestamp(dataframe[obj['field']]) / interval) * interval)
).groupBy('time_interval')
# and then something like gdf.agg(...); gdf.collect()

关于python - 按时间间隔按 Pyspark Dataframe 分组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39129733/

相关文章:

javascript - JSONify 返回奇怪的值

python - Python 中 Podio 文件上传失败

python - 无法在 Django 中使用 Popen 传递服务器密码?

python - 我可以让 pytest doctest 模块忽略一个文件吗?

Python Pandas : How to make a column row dependent on it's previous rows, 可能有一个函数吗?

csv - 将 Spark DataFrame 的内容保存为单个 CSV 文件

python - GCP Dataproc 自定义镜像 Python 环境

apache-spark - Web UI 如何计算存储内存(在 Executors 选项卡中)?

apache-spark - Py4JError : An error occurred while calling o90. 适合

python - 在 Spark Dataframe 中跨多行 json 字符串统一架构