我正在使用 MongoDB-Hadoop 连接器来读取具有嵌入文档的集合。
JSON 集合:PersonaMetaData
{
"user_id" : NumberLong(2),
"persona_created" : true,
"persona_createdAt" : ISODate("2016-02-24T06:41:49.761Z"),
"persona" : [{"persona_type" : 1,
"created_using_algo" : "Name of the algo",
"version_algo" : "1.0",
"createdAt" : ISODate("2016-02-24T06:41:49.761Z"),
"persona_items": {"key1":"value1", "key2": "value2"} }]
}
我创建了以下类来表示集合中的数据
class Persona_Items implements Serializable
{
private int key1;
private String key2;
// Getter/Setter and constructor
}
class Persona implements Serializable
{
String persona_type;
String created_using_algo
String version_algo
long createdAt;
List<Persona_Items> listPersonaItems;
// Getter/setter and constructor
}
class PersonaMetaData implements Serializable
{
long user_id;
boolean persona_created;
long persona_createdAt;
List<Persona> listPersona;
// Getter/setter and constructor
}
然后将其用作
// RDD representing the complete collection
JavaPairRDD<Object, BSONObject> bsonRdd = sc.newAPIHadoopRDD(inputConfig,
com.mongodb.hadoop.MongoInputFormat.class,
Object.class, BSONObject.class);
// Get RDD of PersonaMetaData
JavaRDD<PersonaMetaData> metaDataSchemaJavaRDD =
bsonRdd.map(new Function<Tuple2<Object, BSONObject>, PersonaMetaData >() {
@Override
public PersonaMetaData call(Tuple2<Object, BSONObject> objectBSONObjectTuple2)
throws Exception { // Parse the BSON object and return a new PersonaMetaData object }
// Convert into DataFrame
dataFrame= sqlContext.createDataFrame(metaDataSchemaJavaRDD,
PersonaMetaData.class);
异常
scala.MatchError: io.abc.spark.schema.PersonaMetaData @31ff5060 (of class io.abc.spark.schema.PersonaMetaData ) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:255) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:169) at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:153) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401) at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply(SQLContext.scala:500) at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply(SQLContext.scala:500) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply(SQLContext.scala:500) at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply(SQLContext.scala:498)
Not having any list in the class runs without any issues.
最佳答案
正如 Inferring the Schema Using Reflection 中明确指出的那样Spark SQL, DataFrames and Datasets Guide 部分
Spark SQL does not support JavaBeans that contain nested or contain complex types such as Lists or Arrays.
关于java - 从嵌套的用户定义对象中创建 DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35986157/