我不确定我是否做错了什么,如果这看起来很幼稚,请原谅我,
我的问题可以通过以下数据重现
from pyspark.sql import Row
df = sc.parallelize([Row(C3=u'Dec 1 2013 12:00AM'),
Row(C3=u'Dec 1 2013 12:00AM'),
Row(C3=u'Dec 5 2013 12:00AM')]).toDF()
我创建了一个函数来将此日期字符串解析为日期时间对象以进一步处理
from datetime import datetime
def date_convert(date_str):
date_format = '%b %d %Y %I:%M%p'
try:
dt=datetime.strptime(date_str,date_format)
except ValueError,v:
if len(v.args) > 0 and v.args[0].startswith('unconverted data remains: '):
dt = dt[:-(len(v.args[0])-26)]
dt=datetime.strptime(dt,date_format)
else:
raise v
return dt
现在,如果我从中制作一个 UDF 并应用于我的数据框,我会得到意想不到的数据
from pyspark.sql.functions import udf
date_convert_udf = udf(date_convert)
df.select(date_convert_udf(df.C3).alias("datetime")).take(2)
结果如下
Out[40]:
[Row(datetime=u'java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2013,MONTH=11,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=1,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=0,SECOND=0,MILLISECOND=0,ZONE_OFFSET=?,DST_OFFSET=?]'),
Row(datetime=u'java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2013,MONTH=11,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=1,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=0,SECOND=0,MILLISECOND=0,ZONE_OFFSET=?,DST_OFFSET=?]')]
但是如果我在将数据帧作为 RDD 之后使用它,那么它会返回一个 pythond datetime 对象
df.rdd.map(lambda row:date_convert(row.C3)).collect()
(1) Spark Jobs
Out[42]:
[datetime.datetime(2013, 12, 1, 0, 0),
datetime.datetime(2013, 12, 1, 0, 0),
datetime.datetime(2013, 12, 5, 0, 0)]
我想用 dataframe 实现类似的事情。我该怎么做以及这种方法有什么问题(UDF over dataframe)
最佳答案
因为你要设置你的UDF
的返回类型数据.显然您正在尝试获取 timestamps
,如果是这种情况,你必须写这样的东西。
from pyspark.sql.types import TimestampType
date_convert_udf = udf(date_convert, TimestampType())
关于apache-spark - 为什么 python UDF 返回意外的日期时间对象,而在 RDD 上应用的相同函数给出了正确的日期时间对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39135490/