java - 如何从 parquet 文件读取和写入自定义类

标签 java apache-spark apache-spark-sql parquet

我正在尝试使用 DataFrame/数据集为特定类类型编写 Parquet 读/写类

类架构:

class A {
  long count;
  List<B> listOfValues;
}
class B {
  String id;
  long count;
}

代码:

  String path = "some path";
  List<A> entries = somerandomAentries();
  JavaRDD<A> rdd = sc.parallelize(entries, 1);
  DataFrame df = sqlContext.createDataFrame(rdd, A.class);

  df.write().parquet(path);
  DataFrame newDataDF = sqlContext.read().parquet(path);
  newDataDF.show();

当我尝试运行它时,它会抛出错误。我在这里缺少什么?创建数据框时是否需要为整个类提供架构 错误:

    Caused by: scala.MatchError: B(Id=abc, count=0) (of class B)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:255)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:169)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:153)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401)
    at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1358)
    at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1358)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
    at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1.apply(SQLContext.scala:1358)
    at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1.apply(SQLContext.scala:1356)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:263)
    ... 8 more

最佳答案

您收到错误,因为 Spark 1.6 版本不支持嵌套 JavaBean。请参阅https://spark.apache.org/docs/1.6.0/sql-programming-guide.html#inferring-the-schema-using-reflection

Currently, Spark SQL does not support JavaBeans that contain nested or contain complex types such as Lists or Arrays.

关于java - 如何从 parquet 文件读取和写入自定义类,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40048508/

相关文章:

java - 从文本字段获取信息以在其他类的文本区域中使用它

java - org.apache.axis2.AxisFault : Mapping qname not fond for the package: org. joda.time.chrono

java - 使用 Java 查找 SFTP 最旧文件的文件大小和最后修改时间

apache-spark - 如何根据基于 Pyspark 中另一列的表达式的评估有条件地替换列中的值?

scala - 在 spark shell 中运行脚本时避免打印代码

apache-spark - 如何在 Spark SQL 中创建数据库

java - 如何让扫描仪搜索 token ,然后返回其后的 token ?

scala - 如何在RDD中展平列表?

scala - 在 Spark/Scala 中使用 ForEach 时的执行流程

python - 将列的不同计数添加到 PySpark 中的每一行