python - 使用udf将PySpark数据框中的纪元转换为日期时间

标签 python apache-spark pyspark apache-spark-sql

我有一个具有此架构的 PySpark 数据框:

root
 |-- epoch: double (nullable = true)
 |-- var1: double (nullable = true)
 |-- var2: double (nullable = true)

其中纪元以秒为单位,应转换为日期时间。为此,我定义了一个用户定义函数 (udf),如下所示:

from pyspark.sql.functions import udf    
import time
def epoch_to_datetime(x):
    return time.localtime(x)
    # return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(x))
    # return x * 0 + 1

epoch_to_datetime_udf = udf(epoch_to_datetime, DoubleType())
df.withColumn("datetime", epoch_to_datetime(df2.epoch)).show()

我收到这个错误:

---> 21     return time.localtime(x)
    22     # return x * 0 + 1
    23 
    TypeError: a float is required

如果我只是在函数中返回 x + 1,它就可以工作。尝试 float(x)float(str(x))numpy.float(x)time.localtime(x ) 没有帮助,我仍然收到错误消息。在 udf 之外,time.localtime(1.514687216E9) 或其他数字工作正常。使用 datetime 包将 epoch 转换为 datetim 会导致类似的错误。

似乎 timedatetime 包不喜欢使用 PySpark 的 DoubleType。有什么想法可以解决这个问题吗?谢谢。

最佳答案

你不需要 udf 函数

您只需要将双纪元列转换为timestampType(),然后使用data_format 函数,如下所示

from pyspark.sql import functions as f
from pyspark.sql import types as t
df.withColumn('epoch', f.date_format(df.epoch.cast(dataType=t.TimestampType()), "yyyy-MM-dd"))

这会给你一个字符串日期

root
 |-- epoch: string (nullable = true)
 |-- var1: double (nullable = true)
 |-- var2: double (nullable = true)

你可以使用to_date函数如下

from pyspark.sql import functions as f
from pyspark.sql import types as t
df.withColumn('epoch', f.to_date(df.epoch.cast(dataType=t.TimestampType())))

这会给你 date 作为 datatypeepoch

root
 |-- epoch: date (nullable = true)
 |-- var1: double (nullable = true)
 |-- var2: double (nullable = true)

希望回答对你有帮助

关于python - 使用udf将PySpark数据框中的纪元转换为日期时间,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49971903/

相关文章:

python - 运行 spark 作业 : python vs spark. 提交

apache-spark - 星火迷你集群

python - 如何在同一个 Spark 项目中同时使用 Scala 和 Python?

apache-spark - Pyspark 内存问题

python - Numpy python找到每列的最小值并从每列中减去这个值

python - OpenAI GPT-3 API 错误 : "AttributeError: module ' openai' has no attribute 'GPT' "

Python完美数计算

python - 使用 UDF 加入 Pyspark Dataframe

python - 将字符串列表转换为 Python 数据框 - pyspark python Sparksql

python - 从 sklearn.model_selection.GridSearchCV 获取 keyerror