python - 如何在 pyspark 中以秒为单位获取 datediff() ?

标签 python apache-spark pyspark datediff

我已经尝试了( this_post )中的代码,但无法获得以秒为单位的日期差异。我只是在下面的“Attributes_Timestamp_fix”和“lagged_date”列之间获取 datediff() 。 有什么提示吗? 下面是我的代码和输出。

eg = eg.withColumn("lagged_date", lag(eg.Attributes_Timestamp_fix, 1)
.over(Window.partitionBy("id")
.orderBy("Attributes_Timestamp_fix")))

eg = eg.withColumn("time_diff", 
datediff(eg.Attributes_Timestamp_fix, eg.lagged_date))

        id      Attributes_Timestamp_fix time_diff
0   3.531611e+14    2018-04-01 00:01:02 NaN
1   3.531611e+14    2018-04-01 00:01:02 0.0
2   3.531611e+14    2018-04-01 00:03:13 0.0
3   3.531611e+14    2018-04-01 00:03:13 0.0
4   3.531611e+14    2018-04-01 00:03:13 0.0
5   3.531611e+14    2018-04-01 00:03:13 0.0

最佳答案

pyspark.sql.functions中,有一个函数datediff不幸的是只计算天数差异。为了克服这个问题,您可以将两个日期转换为 unix 时间戳(以秒为单位)并计算差异。

让我们创建一些示例数据,计算滞后,然后计算差异(以秒为单位)。

from pyspark.sql.functions import col, lag, unix_timestamp
from pyspark.sql.window import Window
import datetime

d = [{'id' : 1, 't' : datetime.datetime(2018,01,01)},\
 {'id' : 1, 't' : datetime.datetime(2018,01,02)},\
 {'id' : 1, 't' : datetime.datetime(2018,01,04)},\
 {'id' : 1, 't' : datetime.datetime(2018,01,07)}]

df = spark.createDataFrame(d)
df.show()
+---+-------------------+
| id|                  t|
+---+-------------------+
|  1|2018-01-01 00:00:00|
|  1|2018-01-02 00:00:00|
|  1|2018-01-04 00:00:00|
|  1|2018-01-07 00:00:00|
+---+-------------------+

w = Window.partitionBy('id').orderBy('t')
df.withColumn("previous_t", lag(df.t, 1).over(w))\
  .select(df.t, (unix_timestamp(df.t) - unix_timestamp(col('previous_t'))).alias('diff'))\
  .show()

+-------------------+------+
|                  t|  diff|
+-------------------+------+
|2018-01-01 00:00:00|  null|
|2018-01-02 00:00:00| 86400|
|2018-01-04 00:00:00|172800|
|2018-01-07 00:00:00|259200|
+-------------------+------+

关于python - 如何在 pyspark 中以秒为单位获取 datediff() ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55072513/

相关文章:

python - 使用关键字查找项目

python - 如何循环 python 命令一定次数?

apache-spark - 发送Spark流指标以打开tsdb

python - 将 Python 脚本转换为能够在 Spark/Hadoop 中运行

hadoop - 使用 Apache Spark 的 YARN 压缩编解码器

python - 过滤 Django 反向引用

python - 如何生成等距插值

python - Pyspark 错误 : "Py4JJavaError: An error occurred while calling o655.count." when calling count() method on dataframe

python - 如何将常量值传递给 Python UDF?

json - 从 Hive 表中的 json 字符串中提取值