java - Spark 使用编码器创建数据集,其中 row 是数组类型

标签 java apache-spark apache-spark-sql

我无法弄清楚编码器从 RDD 下面创建数据集的正确实现?

例如

JavaRDD<Integer[]>rdd= sparkContext.parallelize(
                Arrays.asList(new Integer[][]{new Integer[]{1,2},
                new Integer[]{3,4}
                ,new Integer[]{6,7}}));

以下实现失败-
DataSet<Integer> ds = sqlContext.createDataset(rdd.rdd(),Encoders.bean(Integer[].class));

Exception in thread "main" java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:165) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:90) at org.apache.spark.sql.Encoders$.bean(Encoders.scala:142) at org.apache.spark.sql.Encoders.bean(Encoders.scala)



如何为数组类型创建编码?谢谢!

最佳答案

请尝试先将其转换为 DataFrame,然后再转换为 Dataset

Dataset<Integer[]> dataFrame = sqlContext.createDataFrame(rdd, Integer[].class);
Dataset<Integer[]> ds = dataFrame.as(Encoders.bean(Integer[].class));

关于java - Spark 使用编码器创建数据集,其中 row 是数组类型,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46310783/

相关文章:

java - Spring @Transactional 提交失败; Deby + Eclipse链接

python - 在 Spark 数据框中生成可重复的唯一 ID

scala - 如何使用toDF创建带有空值的DataFrame?

java - Spark on YARN - 任务运行速度非常慢

hadoop - sqlContext.read...load() 和 sqlContext.write...save() 代码在 Spark Cluster 上运行在哪里?

java - 对 WeakReference.get() 对象的硬引用会导致内存泄漏吗?

java - 带滚动条的 Jfreechart

java - Actor LibGDX 的移动

java - 如何在spark中将List<Objects>的byte[]解码为Dataset<Row>?

python - 使用 Python SDK 在 Spark 上运行 Apache Beam wordcount 管道时并行度低