scala - Spark : GenericMutableRow cannot be cast to java. lang.Byte 和 <none> 不是一个术语

标签 scala apache-spark cassandra spark-cassandra-connector

我正在尝试将数据从 Spark (v.1.6) DataFrame 存储到 Cassandra。我设法获得了 Spark Cassandra Connector 上的示例页面正在运行,但是,我被自己的代码困住了。考虑以下代码片段:

case class MyCassandraRow(id : Long, sfl : Seq[Float])

df.map(r => 
   MyCassandraRow(
        r.getAsLong(0),
        r.getAs[MySeqFloatWrapper]("sfl").getSeq())
).saveToCassandra("keyspace", "table")

MySeqFloatWrapper 是我编写的一个 UDT,它有一个 getSeq() 方法并返回一个 Seq[Float]

不幸的是,如果我运行这段代码,我会得到一个

java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericMutableRow cannot be cast to java.lang.Byte

实际上,即使我这样做(并且如果我这样做Row(r.getLong(0))),我也会得到这个异常,但如果我写:

df.map(r => r.getLong(0)).collect()

另一方面,如果我在案例类周围添加一个 Row 并编写

df.map(r => 
  Row(
   MyCassandraRow(
        r.getAsLong(0),
        r.getAs[MySeqFloatWrapper]("sfl").getSeq())
  )
).saveToCassandra("keyspace", "table")

我收到以下异常:

scala.ScalaReflectionException: <none> is not a term

最佳答案

我刚刚意识到 ClassCastException 与我的 UDT MySeqFloatWrapper 和我在那里定义的 sqlType 相关,到目前为止显然还没有考虑到这一点与 Spark 1.5 一样,它工作得很好,但 Spark 1.6 就不再工作了(另请参阅 SPARK-12878 )。

如果您需要模板来了解如何正确定义 UDT,另请参阅 github 上的 VectorUDT 示例。 .

关于scala - Spark : GenericMutableRow cannot be cast to java. lang.Byte 和 <none> 不是一个术语,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36030908/

相关文章:

scala - 使用 FunSuite 测试 Spark 抛出 NullPointerException

python - Spark 属性错误 : 'SparkContext' object has no attribute 'map'

regex - 在Scala中删除标点符号形式的文本-Spark

cassandra - Cassandra 中的本地传输请求是什么?

java - 使用java模拟多个用户连接到Cassandra

scala - 如何 groupBy 迭代器而不将其转换为 Scala 中的列表?

java - 类似 MQ 的 Java 库/框架,用于处理具有容错功能的大数据并重新注入(inject)大消息

apache-spark - 为什么 Spark 以 exitCode : 16? 退出

cassandra将数据从一个列族复制到另一列族

scala - 如何指定Specs2测试的执行顺序?