apache-spark - 将类型化 JavaRDD 转换为行 JavaRDD

标签 apache-spark dataframe rdd

我正在尝试将类型化rdd转换为行rdd,然后从中创建数据框。当我执行代码时,它抛出异常

代码:

JavaRDD<Counter> rdd = sc.parallelize(counters);
JavaRDD<Row> rowRDD = rdd.map((Function<Counter, Row>) RowFactory::create);

//I am using some schema here based on the class Counter
DataFrame df = sqlContext.createDataFrame(rowRDD, getSchema());
marineDF.show(); //throws Exception 

从类型化 rdd 到行 rdd 的转换是否保留行工厂中的顺序?如果不是,我如何确定这一点?

类代码:

class Counter {
  long vid;
  byet[] bytes; 
  List<B> blist;
}
class B {
  String id;
  long count;
}

架构:

private StructType getSchema() {
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("vid", DataTypes.LongType, false));
fields.add(DataTypes.createStructField("bytes",DataTypes.createArrayType(DataTypes.ByteType), false));

List<StructField> bFields = new ArrayList<>();
bFields.add(DataTypes.createStructField("id", DataTypes.StringType, false));
bFields.add(DataTypes.createStructField("count", DataTypes.LongType, false));

StructType bclasSchema = DataTypes.createStructType(bFields);

fields.add(DataTypes.createStructField("blist", DataTypes.createArrayType(bclasSchema, false), false));
StructType schema = DataTypes.createStructType(fields);
return schema;
}

失败但有异常:

java.lang.ClassCastException: test.spark.SampleTest$A cannot be cast to java.lang.Long

    at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getLong(rows.scala:42)
    at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getLong(rows.scala:221)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$LongConverter$.toScalaImpl(CatalystTypeConverters.scala:367)

最佳答案

问题是这里没有转换。当您创建 Row它可以接受任意 Object 。它按原样放置。所以它不等于 DataFrame创作:

spark.createDataFrame(rdd, Counter.class); 

Dataset<Counter>创作:

Encoder<Counter> encoder = Encoders.bean(Counter.class);
spark.createDataset(rdd, encoder);

使用 Bean 类时。

所以RowFactory::create只是在这里不适用。如果您想通过RDD<Row>所有值都应该已经以可直接与 DataFrame 一起使用的形式表示与 required type mapping 。这意味着您必须显式映射每个 CounterRow形状如下:

Row(vid, bytes, List(Row(id1, count1), ..., Row(idN, countN))

你的代码应该相当于:

JavaRDD<Row> rows = counters.map((Function<Counter, Row>) cnt -> {
  return RowFactory.create(
    cnt.vid, cnt.bytes,
    cnt.blist.stream().map(b -> RowFactory.create(b.id, b.count)).toArray()
  );
});

Dataset<Row> df = sqlContext.createDataFrame(rows, getSchema());

关于apache-spark - 将类型化 JavaRDD 转换为行 JavaRDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40063996/

相关文章:

apache-spark - "Stage Skipped"在 Apache Spark Web UI 中意味着什么?

python - 如何展平 PySpark 中的嵌套列表?

r - 来自 sparklyr 的非默认方案(数据库)中的访问表

python - 使用 pyarrow 时 Spark 拒绝创建空数据框

python - pandas - 涉及分类分组的最近值查找

r - 在多列中搜索字符串

scala - Spark : How to use mapPartition and create/close connection per partition

object - 在多台机器上使用 spark-submit 运行 spark 项目时得到 "java.lang.NoClassDefFoundError"

R. 处理导入的 Stata 文件中的日期和宽格式

scala - 如何查找 Spark 中每个分区的总和