python - 尝试从 pyspark 中的 parquet 文件收集记录时出现异常

标签 python apache-spark apache-spark-sql pyspark

我不明白为什么,但我无法从 Parquet 文件中读取数据。我从 json 文件制作了 parquet 文件并将其读取到数据框:

df.printSchema()

|-- param: struct (nullable = true)
 |    |-- FORM: string (nullable = true)
 |    |-- URL: string (nullable = true)

当我尝试读取任何记录时,出现错误:

df.select("param").first()

15/07/22 13:06:15 ERROR Executor: Exception in task 0.0 in stage 8.0 (TID 4)
java.lang.IllegalArgumentException: problem reading type: type = group, name = param, original type = null
        at parquet.schema.MessageTypeParser.addGroupType(MessageTypeParser.java:132)
        at parquet.schema.MessageTypeParser.addType(MessageTypeParser.java:106)
        at parquet.schema.MessageTypeParser.addGroupTypeFields(MessageTypeParser.java:96)
        at parquet.schema.MessageTypeParser.parse(MessageTypeParser.java:89)
        at parquet.schema.MessageTypeParser.parseMessageType(MessageTypeParser.java:79)
        at parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:189)
        at parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:138)
        at org.apache.spark.sql.sources.SqlNewHadoopRDD$$anon$1.<init>(SqlNewHadoopRDD.scala:153)
        at org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:124)
        at org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:66)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: expected one of [REQUIRED, OPTIONAL, REPEATED] got utm_medium at line 29:     optional binary amp;utm_medium
        at parquet.schema.MessageTypeParser.asRepetition(MessageTypeParser.java:203)
        at parquet.schema.MessageTypeParser.addType(MessageTypeParser.java:101)
        at parquet.schema.MessageTypeParser.addGroupTypeFields(MessageTypeParser.java:96)
        at parquet.schema.MessageTypeParser.addGroupType(MessageTypeParser.java:130)
        ... 24 more
Caused by: java.lang.IllegalArgumentException: No enum constant parquet.schema.Type.Repetition.UTM_MEDIUM
        at java.lang.Enum.valueOf(Enum.java:238)
        at parquet.schema.Type$Repetition.valueOf(Type.java:70)
        at parquet.schema.MessageTypeParser.asRepetition(MessageTypeParser.java:201)
        ... 27 more
15/07/22 13:06:15 WARN TaskSetManager: Lost task 0.0 in stage 8.0 (TID 4, localhost): java.lang.IllegalArgumentException: problem reading type: type = group, name = param, original type = null
        at parquet.schema.MessageTypeParser.addGroupType(MessageTypeParser.java:132)
        at parquet.schema.MessageTypeParser.addType(MessageTypeParser.java:106)
        at parquet.schema.MessageTypeParser.addGroupTypeFields(MessageTypeParser.java:96)
        at parquet.schema.MessageTypeParser.parse(MessageTypeParser.java:89)
        at parquet.schema.MessageTypeParser.parseMessageType(MessageTypeParser.java:79)
        at parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:189)
        at parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:138)
        at org.apache.spark.sql.sources.SqlNewHadoopRDD$$anon$1.<init>(SqlNewHadoopRDD.scala:153)
        at org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:124)
        at org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:66)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: expected one of [REQUIRED, OPTIONAL, REPEATED] got utm_medium at line 29:     optional binary amp;utm_medium
        at parquet.schema.MessageTypeParser.asRepetition(MessageTypeParser.java:203)
        at parquet.schema.MessageTypeParser.addType(MessageTypeParser.java:101)
        at parquet.schema.MessageTypeParser.addGroupTypeFields(MessageTypeParser.java:96)
        at parquet.schema.MessageTypeParser.addGroupType(MessageTypeParser.java:130)
        ... 24 more
Caused by: java.lang.IllegalArgumentException: No enum constant parquet.schema.Type.Repetition.UTM_MEDIUM
        at java.lang.Enum.valueOf(Enum.java:238)
        at parquet.schema.Type$Repetition.valueOf(Type.java:70)
        at parquet.schema.MessageTypeParser.asRepetition(MessageTypeParser.java:201)
        ... 27 more

15/07/22 13:06:15 ERROR TaskSetManager: Task 0 in stage 8.0 failed 1 times; aborting job
15/07/22 13:06:15 INFO TaskSchedulerImpl: Removed TaskSet 8.0, whose tasks have all completed, from pool 
15/07/22 13:06:15 INFO TaskSchedulerImpl: Cancelling stage 8
15/07/22 13:06:15 INFO DAGScheduler: ShuffleMapStage 8 (first at <ipython-input-8-5cb9a7b45630>:1) failed in 0.083 s
15/07/22 13:06:15 INFO DAGScheduler: Job 4 failed: first at <ipython-input-8-5cb9a7b45630>:1, took 0.159103 s
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-8-5cb9a7b45630> in <module>()
----> 1 df.select("param").first()

/home/vagrant/spark/python/pyspark/sql/dataframe.pyc in first(self)
    676         Row(age=2, name=u'Alice')
    677         """
--> 678         return self.head()
    679 
    680     @ignore_unicode_prefix

/home/vagrant/spark/python/pyspark/sql/dataframe.pyc in head(self, n)
    664         """
    665         if n is None:
--> 666             rs = self.head(1)
    667             return rs[0] if rs else None
    668         return self.take(n)

/home/vagrant/spark/python/pyspark/sql/dataframe.pyc in head(self, n)
    666             rs = self.head(1)
    667             return rs[0] if rs else None
--> 668         return self.take(n)
    669 
    670     @ignore_unicode_prefix

/home/vagrant/spark/python/pyspark/sql/dataframe.pyc in take(self, num)
    338         [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
    339         """
--> 340         return self.limit(num).collect()
    341 
    342     @ignore_unicode_prefix

/home/vagrant/spark/python/pyspark/sql/dataframe.pyc in collect(self)
    312         """
    313         with SCCallSiteSync(self._sc) as css:
--> 314             port = self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd())
    315         rs = list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
    316         cls = _create_cls(self.schema)

/home/vagrant/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/home/vagrant/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 1 times, most recent failure: Lost task 0.0 in stage 8.0 (TID 4, localhost): java.lang.IllegalArgumentException: problem reading type: type = group, name = param, original type = null
        at parquet.schema.MessageTypeParser.addGroupType(MessageTypeParser.java:132)
        at parquet.schema.MessageTypeParser.addType(MessageTypeParser.java:106)
        at parquet.schema.MessageTypeParser.addGroupTypeFields(MessageTypeParser.java:96)
        at parquet.schema.MessageTypeParser.parse(MessageTypeParser.java:89)
        at parquet.schema.MessageTypeParser.parseMessageType(MessageTypeParser.java:79)
        at parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:189)
        at parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:138)
        at org.apache.spark.sql.sources.SqlNewHadoopRDD$$anon$1.<init>(SqlNewHadoopRDD.scala:153)
        at org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:124)
        at org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:66)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: expected one of [REQUIRED, OPTIONAL, REPEATED] got utm_medium at line 29:     optional binary amp;utm_medium
        at parquet.schema.MessageTypeParser.asRepetition(MessageTypeParser.java:203)
        at parquet.schema.MessageTypeParser.addType(MessageTypeParser.java:101)
        at parquet.schema.MessageTypeParser.addGroupTypeFields(MessageTypeParser.java:96)
        at parquet.schema.MessageTypeParser.addGroupType(MessageTypeParser.java:130)
        ... 24 more
Caused by: java.lang.IllegalArgumentException: No enum constant parquet.schema.Type.Repetition.UTM_MEDIUM
        at java.lang.Enum.valueOf(Enum.java:238)
        at parquet.schema.Type$Repetition.valueOf(Type.java:70)
        at parquet.schema.MessageTypeParser.asRepetition(MessageTypeParser.java:201)
        ... 27 more

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

有人可以帮我解决这个问题吗?我做错了什么?

最佳答案

您有机会重命名这些列吗?当我使用“SUM(X)”列时,即使在重命名之后,我也遇到了同样的错误。在保存为 Parquet 文件之前更改列名称为我解决了这个问题,参见。 http://trustedanalytics.github.io/atk/versions/v0.4.1/errata.html .

关于python - 尝试从 pyspark 中的 parquet 文件收集记录时出现异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31564364/

相关文章:

scala - java.lang.ClassNotFoundException : org. apache.spark.sql.Dataset

python - 尝试将 dict 值插入 postgresql 表时出现类型错误

apache-spark - Apache Spark 统计数据集合..

java - 处理(Drop and Log) Kafka producer 发布的不良数据,这样 Spark (Java) Consumer 不会将其存储在 HDFS 中

scala - Spark DataFrame/Dataset Find most common value for each key 高效的方式

pyspark - 使用 Synapse Analytics 将数据帧写入 SQL 专用数据库

python - 如何在 matplotlib 绘图循环中为标记和线条设置相同的颜色?

Python:枚举

python - SQLalchemy 具有一对一关系的批量插入

Java - Apache Spark 通信