apache-spark - pyspark createdataframe : string interpreted as timestamp,模式混合列

标签 apache-spark pyspark apache-spark-sql pyspark-sql

我在spark数据帧中遇到了一个非常奇怪的错误,该错误导致将字符串作为时间戳进行评估。

这是我的设置代码:

from datetime import datetime
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

new_schema = StructType([StructField("item_id", StringType(), True),
                         StructField("date", TimestampType(), True),
                         StructField("description", StringType(), True)
                        ])

df = sqlContext.createDataFrame([Row(description='description', date=datetime.utcnow(), item_id='id_string')], new_schema)

这给了我以下错误:

AttributeError Traceback (most recent call last) in () ----> 1 df = sqlContext.createDataFrame([Row(description='hey', date=datetime.utcnow(), item_id='id_string')], new_schema)

/home/florian/spark/python/pyspark/sql/context.pyc in createDataFrame(self, data, schema, samplingRatio, verifySchema) 307 Py4JJavaError: ... 308 """ --> 309 return self.sparkSession.createDataFrame(data, schema, samplingRatio, verifySchema) 310 311 @since(1.3)

/home/florian/spark/python/pyspark/sql/session.pyc in createDataFrame(self, data, schema, samplingRatio, verifySchema) 522 rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio) 523 else: --> 524 rdd, schema = self._createFromLocal(map(prepare, data), schema) 525 jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd()) 526 jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())

/home/florian/spark/python/pyspark/sql/session.pyc in _createFromLocal(self, data, schema) 397 398 # convert python objects to sql data --> 399 data = [schema.toInternal(row) for row in data] 400 return self._sc.parallelize(data), schema 401

/home/florian/spark/python/pyspark/sql/types.pyc in toInternal(self, obj) 574 return tuple(f.toInternal(obj.get(n)) for n, f in zip(self.names, self.fields)) 575 elif isinstance(obj, (tuple, list)): --> 576 return tuple(f.toInternal(v) for f, v in zip(self.fields, obj)) 577 elif hasattr(obj, "dict"): 578 d = obj.dict

/home/florian/spark/python/pyspark/sql/types.pyc in ((f, v)) 574 return tuple(f.toInternal(obj.get(n)) for n, f in zip(self.names, self.fields)) 575 elif isinstance(obj, (tuple, list)): --> 576 return tuple(f.toInternal(v) for f, v in zip(self.fields, obj)) 577 elif hasattr(obj, "dict"): 578 d = obj.dict

/home/florian/spark/python/pyspark/sql/types.pyc in toInternal(self, obj) 434 435 def toInternal(self, obj): --> 436 return self.dataType.toInternal(obj) 437 438 def fromInternal(self, obj):

/home/florian/spark/python/pyspark/sql/types.pyc in toInternal(self, dt) 188 def toInternal(self, dt): 189 if dt is not None: --> 190 seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo 191 else time.mktime(dt.timetuple())) 192 return int(seconds * 1e6 + dt.microsecond)

AttributeError: 'str' object has no attribute 'tzinfo'



这似乎是将字符串传递给TimestampType.toInternal()

真正奇怪的是,此数据框会产生相同的错误:
df = sqlContext.createDataFrame([Row(description='hey', date=None, item_id='id_string')], new_schema)

虽然这一工程:
df = sqlContext.createDataFrame([Row(description=None, date=datetime.now(), item_id='id_string')], new_schema)

这个也很好:
df = sqlContext.createDataFrame([Row(description=None, date=datetime.now(), item_id=None)], new_schema)

对我来说,这意味着pyspark会以某种方式将“item_id”中的值放入“日期”列中,因此会产生此错误。
我做错什么了吗?这是数据框中的错误吗?

信息:
我正在使用pyspark 2.0.1

编辑:
df = sqlContext.createDataFrame([Row(description=None, date=datetime.now(), item_id=None)], new_schema)
df.first()

Row(item_id=u'java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2017,MONTH=1,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=3,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=1,HOUR=3,HOUR_OF_DAY=15,MINUTE=19,SECOND=30,MILLISECOND=85,ZONE_OFFSET=?,DST_OFFSET=?]', date=None, description=None)

最佳答案

创建Row对象时,字段将按字母顺序(http://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html#pyspark.sql.Row)进行排序,因此,当您创建Row(description, date, item_id)对象时,该对象将按(date, description, item_id)进行排序。

由于您的架构按StringType, TimestampType, StringType排序,因此在创建具有此Row和架构的DataFrame时,Spark会将date中的内容映射到StringType,将description中的内容映射到TimestampType,将item_id中的内容映射到StringType

将时间戳记(以datetime格式)传递给StringType不会导致错误,但是将字符串传递给TimestampType却会导致错误,因为它要求提供tzinfo属性,正如错误所指出的那样,String对象没有该属性。

同样,为您工作的数据框实际工作的原因是因为None被传递到架构中的TimestampType,这是一个可接受的值。

关于apache-spark - pyspark createdataframe : string interpreted as timestamp,模式混合列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42026089/

相关文章:

scala - 从 Apache Spark 访问公共(public)可用的 Amazon S3 文件

python - 无法将 StructField 与 PySpark 一起使用

pyspark - 如何将 DLT 目标表定向到 Unity Catalog Metastore

python - Pyspark:TaskMemoryManager:无法分配页面:需要错误分析帮助

java - Spark 多次对数据库运行查询

scala - Spark DataFrame 并行性

python - 获取某列的 "circular lag"

apache-spark - 如果Spark中的数据帧是不可变的,为什么我们可以使用withColumn()之类的操作对其进行修改?

apache-spark-sql - 如何在 Databricks SQL 中查找具有相同(相似)名称的所有表

apache-spark - Spark : What is the difference between repartition and repartitionByRange?