apache-spark - 在 Pyspark 中将时间戳更改为 UTC 格式

标签 apache-spark pyspark spark-dataframe

我有一个输入数据帧( ip_df ),该数据帧中的数据如下所示:

id            timestamp_value
1       2017-08-01T14:30:00+05:30
2       2017-08-01T14:30:00+06:30
3       2017-08-01T14:30:00+07:30

我需要创建一个新的数据帧( op_df ),其中我需要将时间戳值转换为 UTC 格式。所以最终输出数据帧将如下所示:
id            timestamp_value
1       2017-08-01T09:00:00+00:00
2       2017-08-01T08:00:00+00:00
3       2017-08-01T07:00:00+00:00

我想使用 PySpark 实现它。有人可以帮我吗?任何帮助都会得到帮助。

最佳答案

如果您绝对需要完全按照指示格式化时间戳,即时区表示为“+00:00”,我认为使用 UDF 作为 already suggested是您最好的选择。

但是,如果您可以容忍时区略有不同的表示,例如无论是“+0000”(无冒号分隔符)还是“Z”,都可以在没有 UDF 的情况下执行此操作,根据数据集的大小,它的性能可能会更好。

鉴于以下数据表示

+---+-------------------------+
|id |timestamp_value          |
+---+-------------------------+
|1  |2017-08-01T14:30:00+05:30|
|2  |2017-08-01T14:30:00+06:30|
|3  |2017-08-01T14:30:00+07:30|
+---+-------------------------+

由:
l = [(1, '2017-08-01T14:30:00+05:30'), (2, '2017-08-01T14:30:00+06:30'), (3, '2017-08-01T14:30:00+07:30')]
ip_df = spark.createDataFrame(l, ['id', 'timestamp_value'])

哪里timestamp_valueString ,您可以执行以下操作(这里使用 to_timestampsession local timezone support,它们是在 Spark 2.2 中引入的):
from pyspark.sql.functions import to_timestamp, date_format
spark.conf.set('spark.sql.session.timeZone', 'UTC')
op_df = ip_df.select(
    date_format(
        to_timestamp(ip_df.timestamp_value, "yyyy-MM-dd'T'HH:mm:ssXXX"), 
        "yyyy-MM-dd'T'HH:mm:ssZ"
    ).alias('timestamp_value'))

产生:
+------------------------+
|timestamp_value         |
+------------------------+
|2017-08-01T09:00:00+0000|
|2017-08-01T08:00:00+0000|
|2017-08-01T07:00:00+0000|
+------------------------+

或者,略有不同:
op_df = ip_df.select(
    date_format(
        to_timestamp(ip_df.timestamp_value, "yyyy-MM-dd'T'HH:mm:ssXXX"), 
        "yyyy-MM-dd'T'HH:mm:ssXXX"
    ).alias('timestamp_value'))

产生:
+--------------------+
|timestamp_value     |
+--------------------+
|2017-08-01T09:00:00Z|
|2017-08-01T08:00:00Z|
|2017-08-01T07:00:00Z|
+--------------------+

关于apache-spark - 在 Pyspark 中将时间戳更改为 UTC 格式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45434538/

相关文章:

Pyspark-将列表/元组传递给toDF函数

scala - 从Apache Spark填充Elasticsearch日期

apache-spark - Spark Streaming - DStream 没有 distinct()

scala - Spark CSV 包无法处理字段中的\n

scala - Spark 将 DataFrame API 中的所有 NaN 替换为 null

scala - 我该如何解决 "need struct type but got struct"

apache-spark - Google Dataflow与Apache Storm

apache-spark - 计算pyspark中每组成对连续行之间的时间差

docker - 使用 spark-on-k8s-operator 在 Kubernetes 上运行的 Pyspark 的依赖性问题

scala - Spark avro 到 Parquet