java - Spark SQL : Nested classes to parquet error

标签 java apache-spark apache-spark-sql parquet

我似乎无法写信给 parquet a JavaRDD<T>其中 T 表示 Person类(class)。我把它定义为

public class Person implements Serializable
{
    private static final long serialVersionUID = 1L;
    private String name;
    private String age;
    private Address address;
....

Address :

public class Address implements Serializable
{
    private static final long serialVersionUID = 1L;
    private String City; private String Block;
    ...<getters and setters>

然后我创建一个 JavaRDD像这样:

JavaRDD<Person> people = sc.textFile("/user/johndoe/spark/data/people.txt").map(new Function<String, Person>()
    {
        public Person call(String line)
        {
            String[] parts = line.split(",");
            Person person = new Person();
            person.setName(parts[0]);
            person.setAge("2");
            Address address = new Address("HomeAdd","141H");
            person.setAddress(address);
            return person;
        }
    });

注意 - 我手动设置 Address所有人都一样。这基本上是一个嵌套的 RDD。在尝试将其保存为 Parquet 文件时:

DataFrame dfschemaPeople = sqlContext.createDataFrame(people, Person.class);
dfschemaPeople.write().parquet("/user/johndoe/spark/data/out/people.parquet");    

地址类是:

import java.io.Serializable;
public class Address implements Serializable
{
    public Address(String city, String block)
    {
        super();
        City = city;
        Block = block;
    }
    private static final long serialVersionUID = 1L;
    private String City;
    private String Block;
    //Omitting getters and setters
}

我遇到错误:

引起:java.lang.ClassCastException:com.test.schema.Address cannot be cast to org.apache.spark.sql.Row

我正在运行 spark-1.4.1。

  • 这是一个已知错误吗?
  • 如果我通过导入相同格式的嵌套 JSON 文件来执行相同操作,我就可以保存到 parquet。
  • 即使我创建了一个子 DataFrame,如:DataFrame dfSubset = sqlContext.sql("SELECT address.city FROM PersonTable");我仍然遇到同样的错误

那么是什么给了?如何从文本文件中读取复杂的数据结构并另存为 parquet?看来我不能这样做。

最佳答案

您正在使用有限制的 java api

来自 spark 文档: http://spark.apache.org/docs/1.4.1/sql-programming-guide.html#interoperating-with-rdds

Spark SQL 支持自动将 JavaBeans 的 RDD 转换为 DataFrame。使用反射获得的 BeanInfo 定义了表的模式。目前,Spark SQL 不支持包含嵌套或复杂类型(如 Lists 或 Arrays)的 JavaBeans。您可以创建一个 JavaBean,方法是创建一个实现 Serializable 并为其所有字段提供 getter 和 setter 的类。 对于 Scala 案例类,它将起作用(更新为写入 Parquet 格式)

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD

case class Address(city:String, block:String);
case class Person(name:String,age:String, address:Address);
object Test2 {
  def main(args: Array[String]): Unit = {

     val conf = new SparkConf().setAppName("Simple Application").setMaster("local");
      val sc = new SparkContext(conf)
      val sqlContext = new org.apache.spark.sql.SQLContext(sc);
      import sqlContext.implicits._
      val people = sc.parallelize(List(Person("a", "b", Address("a", "b")), Person("c", "d", Address("c", "d"))));

      val df  = sqlContext.createDataFrame(people);
      df.write.mode("overwrite").parquet("/tmp/people.parquet")
  }
}

关于java - Spark SQL : Nested classes to parquet error,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35823213/

相关文章:

python - Pyspark:将列中的 json 分解为多列

scala - 将数据帧转换为密集向量 Spark

scala - 通过 Spark 读取文件夹中保存的所有 Parquet 文件

python - 在 pyspark 数据框中循环两列时向新列添加值

java - 使用 Clojure 需要哪些 Java 组件?

java - 将 java 函数 URLDecoder.decode 应用于 Spark 3 中的整个列

java - 每次用 java 检查一个框时更新所有 TextFields

apache-spark - 在多列上应用窗口函数

java - 二维的 toCharArray()

java - java ThreadPoolExecutor 中的workerCountOf()方法