apache-spark - java.lang.UnsupportedOperationException : Error in spark when writing

标签 apache-spark apache-spark-dataset

当我尝试将数据集写入 Parquet 文件时,出现以下错误

18/11/05 06:25:43 ERROR FileFormatWriter: Aborting job null.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 84 in stage 1.0 failed 4 times, most recent failure: Lost task 84.3 in stage 1.0 (TID 989, ip-10-253-194-207.nonprd.aws.csp.net, executor 4): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary
        at org.apache.parquet.column.Dictionary.decodeToInt(Dictionary.java:48)
        at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getInt(OnHeapColumnVector.java:233)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

但是当我提供 dataset.show() 时,我可以查看数据。不确定在哪里检查根本原因。

最佳答案

我遇到了同样的问题,就我而言,这是由于 Parquet 文件之间的架构差异:

鉴于这个 Parquet 目录,有一些文件:

  • /user/user1/parquet_table/part-00000-1e73689f-69e5-471a-8510-1547d108fea3-c000.parquet
  • /user/user1/parquet_table/part-00000-276bf4c0-7214-4278-8131-53cd5339a50d-c000.parquet

  • 当我尝试合并它们时(spark2-shell):
    val parquetFileDF = spark.read.parquet("/user/user1/parquet_table/part-00000-*.parquet")
    val parquetFileDFCoal = parquetFileDF.coalesce(8)
    parquetFileDFCoal.write.parquet("/tmp/testTemp/0001")
    

    我遇到这个异常:
    [Stage 4:> (0 + 8) / 8]20/05/13 17:09:03 WARN scheduler.TaskSetManager: Lost task 5.0 in stage 4.0 (TID 116, node.localhost.localdomain, executor 70): org.apache.spark.SparkException: Task failed while writing rows
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    ...
    Caused by: java.lang.UnsupportedOperationException: parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary
    at parquet.column.Dictionary.decodeToInt(Dictionary.java:48)
    at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getInt(OnHeapColumnVector.java:233)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    

    如果您使用 spark2-shell 检查每个文件,您可能会发现架构差异。这里 :
    scala> val parquetFileDF = spark.read.parquet("/user/user1/parquet_table/part-00000-1e73689f-69e5-471a-8510-1547d108fea3-c000.parquet")
    parquetFileDF: org.apache.spark.sql.DataFrame = [root_id: string, father_id: string ... 7 more fields]
    
    scala> parquetFileDF.printSchema()
    root
    |-- root_id: string (nullable = true)
    |-- father_id: string (nullable = true)
    |-- self_id: string (nullable = true)
    |-- group_name: string (nullable = true)
    |-- father_name: string (nullable = true)
    |-- cle: string (nullable = true)
    |-- value: integer (nullable = true)
    
    
    scala> val parquetFileDF = spark.read.parquet("/user/user1/parquet_table/part-00000-276bf4c0-7214-4278-8131-53cd5339a50d-c000.parquet ")
    parquetFileDF: org.apache.spark.sql.DataFrame = [root_id: string, father_id: string ... 7 more fields]
    
    scala> parquetFileDF.printSchema()
    root
    |-- root_id: string (nullable = true)
    |-- father_id: string (nullable = true)
    |-- self_id: string (nullable = true)
    |-- group_name: string (nullable = true)
    |-- father_name: string (nullable = true)
    |-- cle: string (nullable = true)
    |-- value: string (nullable = true)
    

    可以看到有时value字段是Integer,有时是String。
    要修复它,您必须转换其中一个文件以匹配类型。

    关于apache-spark - java.lang.UnsupportedOperationException : Error in spark when writing,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53149457/

    相关文章:

    python - 为什么 python dataFrames' 只位于同一台机器上?

    scala - Scala 中的 Spark 爆炸 - 将爆炸列添加到行

    hadoop - Spark : multiple spark-submit in parallel

    scala - 如何将数据集写入Kafka主题?

    java - RDS 到 S3 - 数据转换 AWS

    scala - Spark Dataset select with typedcolumn

    java - Apache Spark 聚合函数在运行时遇到 ArrayIndexOutOfBoundsException

    scala - 如何使用类型化数据集将多值列拆分为单独的行?

    python - 访问 WrappedArray 元素

    apache-spark - 更改 Spark 数据帧分区写入的路径