scala - 如何避免在自定义数据源中对数组中的字节进行装箱?

标签 scala apache-spark apache-spark-sql

我正在处理自定义 Spark 数据源,并希望架构包含一行原始字节数组类型。

我的问题是生成的字节数组中的字节被装箱:输出类型为 WrappedArray$ofRef .这意味着每个字节都表示为一个 java.lang.Object。虽然我可以解决这个问题,但我担心计算和内存开销,这对我的应用程序至关重要。我真的只想要原始数组!

下面是一个演示此行为的最小示例。

class DefaultSource extends SchemaRelationProvider with DataSourceRegister {

    override def shortName(): String = "..."

    override def createRelation(
                                    sqlContext: SQLContext,
                                    parameters: Map[String, String],
                                    schema: StructType = new StructType()
                               ): BaseRelation = {
        new DefaultRelation(sqlContext)
    }
}

class DefaultRelation(val sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan {

    override def schema = {
        StructType(
            Array(
                StructField("key", ArrayType(ByteType))
            )
        )
    }

    override def buildScan(
                              requiredColumnNames: Array[String],
                              filterArr: Array[Filter]
                          ): RDD[Row] = {
        testRDD
    }

    def testRDD = sqlContext.sparkContext.parallelize(
        List(
            Row(
                Array[Byte](1)
            )
        )
    )
}

使用此示例数据源如下:
def schema = StructType(Array(StructField("key", ArrayType(ByteType))))
val rows = sqlContext
        .read
        .schema(schema)
        .format("testdatasource")
        .load
        .collect()
println(rows(0)(0).getClass)

然后生成以下输出:
class scala.collection.mutable.WrappedArray$ofRef

在调试器中进一步检查结果类型确认 WrappedArray 中的字节确实被装箱 - 由于某种原因,它们的类型一直被删除到 java.lang.Object (而不是 java.lang.Byte )。

请注意,直接使用 RDD,而不通过数据源 API,会导致原始字节数组的预期结果。

任何有关如何解决此问题的建议将不胜感激。

最佳答案

好的,对于原始字节数组,我应该使用 BinaryType而不是 Array(Byte)作为列类型。这解决了问题。

出于好奇,如果我们改变ArrayType(ByteType)例如ArrayType(LongType)在上面的例子中,我们实际上得到了一个运行时异常,表明需要装箱的 long。因此,似乎 Spark SQL 数组中的原语总是装箱的。

关于scala - 如何避免在自定义数据源中对数组中的字节进行装箱?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41546922/

相关文章:

scala - Akka——类型不匹配; [错误] 发现 : Unit [error] required: scala. sys.process.ProcessLogger

scala - 将 scala 2.10 future 转换为 scalaz.concurrent.Future//任务

apache-spark - Apache Spark 在本地模式下性能下降

apache-spark - 如何对多个 Spark 作业并行执行多个 Kafka 主题

java - Spark Java - 如何迭代数据帧 Dataset<Row> 中的行,并将一列的值添加到 Arraylist

list - 如何使用 Scala 从列表中创建单独的列表?

scala - com.earldouglas#xsbt-web-plugin 在哪里发布?

hadoop - 如何按顺序从Apache Spark向Kafka发送消息主题

apache-spark - 如何均匀分布数据集以避免倾斜连接(和长时间运行的任务)?

apache-spark - Delta Lake 中的外部表与内部表