我有一个具有此架构的 PySpark 数据框:
root
|-- epoch: double (nullable = true)
|-- var1: double (nullable = true)
|-- var2: double (nullable = true)
其中纪元以秒为单位,应转换为日期时间。为此,我定义了一个用户定义函数 (udf),如下所示:
from pyspark.sql.functions import udf
import time
def epoch_to_datetime(x):
return time.localtime(x)
# return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(x))
# return x * 0 + 1
epoch_to_datetime_udf = udf(epoch_to_datetime, DoubleType())
df.withColumn("datetime", epoch_to_datetime(df2.epoch)).show()
我收到这个错误:
---> 21 return time.localtime(x)
22 # return x * 0 + 1
23
TypeError: a float is required
如果我只是在函数中返回 x + 1
,它就可以工作。尝试 float(x)
或 float(str(x))
或 numpy.float(x)
在 time.localtime(x )
没有帮助,我仍然收到错误消息。在 udf
之外,time.localtime(1.514687216E9)
或其他数字工作正常。使用 datetime
包将 epoch
转换为 datetim 会导致类似的错误。
似乎 time
和 datetime
包不喜欢使用 PySpark 的 DoubleType
。有什么想法可以解决这个问题吗?谢谢。
最佳答案
你不需要 udf
函数
您只需要将双纪元列转换为timestampType()
,然后使用data_format
函数,如下所示
from pyspark.sql import functions as f
from pyspark.sql import types as t
df.withColumn('epoch', f.date_format(df.epoch.cast(dataType=t.TimestampType()), "yyyy-MM-dd"))
这会给你一个字符串日期
root
|-- epoch: string (nullable = true)
|-- var1: double (nullable = true)
|-- var2: double (nullable = true)
你可以使用to_date
函数如下
from pyspark.sql import functions as f
from pyspark.sql import types as t
df.withColumn('epoch', f.to_date(df.epoch.cast(dataType=t.TimestampType())))
这会给你 date
作为 datatype
到 epoch
列
root
|-- epoch: date (nullable = true)
|-- var1: double (nullable = true)
|-- var2: double (nullable = true)
希望回答对你有帮助
关于python - 使用udf将PySpark数据框中的纪元转换为日期时间,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49971903/