PySpark 毫秒的时间戳

标签 pyspark

我正在尝试获取两个时间戳列之间的差异,但毫秒数消失了。

如何纠正这个?

from pyspark.sql.functions import unix_timestamp
timeFmt = "yyyy-MM-dd' 'HH:mm:ss.SSS"

data = [
    (1, '2018-07-25 17:15:06.39','2018-07-25 17:15:06.377'),
    (2,'2018-07-25 11:12:49.317','2018-07-25 11:12:48.883')

]

df = spark.createDataFrame(data, ['ID', 'max_ts','min_ts']).withColumn('diff',F.unix_timestamp('max_ts', format=timeFmt) - F.unix_timestamp('min_ts', format=timeFmt))
df.show(truncate = False)

最佳答案

这是 unix_timestamp 的预期行为- 它在 source code docstring 中明确说明它只返回秒,因此在计算时删除毫秒组件。

如果您想进行该计算,可以使用 substring函数来连接数字然后做差异。请参阅下面的示例。请注意,这假设数据完全形成,例如毫秒完全满足(所有 3 位数字):

import pyspark.sql.functions as F

timeFmt = "yyyy-MM-dd' 'HH:mm:ss.SSS"
data = [
    (1, '2018-07-25 17:15:06.390', '2018-07-25 17:15:06.377'),  # note the '390'
    (2, '2018-07-25 11:12:49.317', '2018-07-25 11:12:48.883')
]

df = spark.createDataFrame(data, ['ID', 'max_ts', 'min_ts'])\
    .withColumn('max_milli', F.unix_timestamp('max_ts', format=timeFmt) + F.substring('max_ts', -3, 3).cast('float')/1000)\
    .withColumn('min_milli', F.unix_timestamp('min_ts', format=timeFmt) + F.substring('min_ts', -3, 3).cast('float')/1000)\
    .withColumn('diff', (F.col('max_milli') - F.col('min_milli')).cast('float') * 1000)

df.show(truncate=False)

+---+-----------------------+-----------------------+----------------+----------------+---------+
|ID |max_ts                 |min_ts                 |max_milli       |min_milli       |diff     |
+---+-----------------------+-----------------------+----------------+----------------+---------+
|1  |2018-07-25 17:15:06.390|2018-07-25 17:15:06.377|1.53255330639E9 |1.532553306377E9|13.000011|
|2  |2018-07-25 11:12:49.317|2018-07-25 11:12:48.883|1.532531569317E9|1.532531568883E9|434.0    |
+---+-----------------------+-----------------------+----------------+----------------+---------+

关于PySpark 毫秒的时间戳,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54951348/

相关文章:

apache-spark - Spark : Find Each Partition Size for RDD

python - 给定开始日期和结束日期,生成中间所有周的最有效方法是什么?

python - 在 DataFrame 上应用映射函数

apache-spark - 如何使用 spark-csv 包在 HDFS 上仅读取 n 行大型 CSV 文件?

apache-spark - 为什么在本地模式下加入 spark 这么慢?

postgresql - 从 Dataframe 到 DB 的批量插入忽略 Pyspark 中的失败行

apache-spark - Spark 窗口函数中的条件

hadoop - 从pyspark连接HiveServer2

hadoop - 如何在 spark 中使用 `wholeTextFile` 保存来自 `saveATextFile` RDD 的结果?

PySpark 结构化流测试支持