scala - 如何更改 Spark 数据集上的架构

标签 scala apache-spark encoder apache-spark-dataset

当我在 Spark 2 中检索数据集时,使用 select 语句,底层列继承了查询列的数据类型。

val ds1 = spark.sql("select 1 as a, 2 as b, 'abd' as c")

ds1.printSchema()
root
 |-- a: integer (nullable = false)
 |-- b: integer (nullable = false)
 |-- c: string (nullable = false)

现在,如果我将其转换为案例类,它将正确转换值,但底层模式仍然是错误的。

case class abc(a: Double, b: Double, c: String)
val ds2 = ds1.as[abc]
ds2.printSchema()
root
 |-- a: integer (nullable = false)
 |-- b: integer (nullable = false)
 |-- c: string (nullable = false)

ds2.collect
res18: Array[abc] = Array(abc(1.0,2.0,abd))

我“应该”能够在创建第二个数据集时指定要使用的编码器,但 scala 似乎忽略了这个参数(这是 BUG 吗?):

val abc_enc = org.apache.spark.sql.Encoders.product[abc]

val ds2 = ds1.as[abc](abc_enc)

ds2.printSchema
root
 |-- a: integer (nullable = false)
 |-- b: integer (nullable = false)
 |-- c: string (nullable = false)

所以我能看到的唯一方法是使用 createDataset,无需非常复杂的映射即可简单地执行此操作,但这需要对底层对象进行收集,因此它并不理想。

val ds2 = spark.createDataset(ds1.as[abc].collect)

最佳答案

这是 Spark API 中的一个未解决问题(查看此票证 SPARK-17694)

所以您需要做的是进行额外的显式转换。这样的事情应该有效:

ds1.as[abc].map(x => x : abc)

关于scala - 如何更改 Spark 数据集上的架构,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45162747/

相关文章:

apache-spark - 使用日期字段对 Dataframe 进行 Spark 分区并在每个分区上运行算法

python - 如果列数不同,如何处理从源 Spark df 到 Hive 表的插入

linux - 为 Linux 开发

c++ - 在我的文本编码器程序中找不到错误

scala - 测试给定实例是否是 Scala 中给定类的(子)类

scala - 如何访问案例类字段字段的字符串名称中的值

scala - 对测试用例进行分组或规范 2 中的 "shared_examples_for"等效项

function - scala匿名函数问题

apache-spark - 发生异常 : pyspark. sql.utils.AnalysisException 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'

ubuntu - 为 ubuntu 16.04 安装 FFMPEG 的所有编码器?