scala - Cassandra spark 连接器编写嵌套的可选案例类

标签 scala cassandra apache-spark spark-cassandra-connector

如何使用 cassandra spark 连接器编写可选的案例类?

例子:

case class User(name : String, address : Option[Address])
case class Address(street : String, city : String)

当我尝试使用 rdd.saveToCassandra 将用户保存到 cassandra 时,它引发了错误

Failed to get converter for field "address" of type scala.Option[Address] in User mapped to column "address" of "testspark.logs_raw"

我已经尝试实现一个 TypeConverter 但没有奏效。

然而,嵌套的案例类被正确地转换为 cassandra UDT 并且可选字段被接受。

有什么好的方法可以在不更改数据模型的情况下解决这个问题?

最佳答案

只是为了可见性。在现代版本中一切正常——围绕 SCC 1.4.0-1.6.0 的 UDT 有很多变化,加上 SCC 2.0.8 中的许多性能优化。使用 SCC 2.5.1,RDD API 可以正确映射所有内容——例如,如果我们有以下 UDT 和表:

cqlsh> create type test.address (street text, city text);
cqlsh> create table test.user(name text primary key, address test.address);
cqlsh> insert into test.user(name, address) values 
   ('with address', {street: 'street 1', city: 'city1'});
cqlsh> insert into test.user(name) values ('without address');
cqlsh> select * from test.user;

 name            | address
-----------------+-------------------------------------
    with address | {street: 'street 1', city: 'city1'}
 without address |                                null

(2 rows)

然后 RDD API 能够在读取数据时正确地提取所有内容:

scala> import com.datastax.spark.connector._
import com.datastax.spark.connector._

scala> case class Address(street : String, city : String)
defined class Address

scala> case class User(name : String, address : Option[Address])
defined class User

scala> val data = sc.cassandraTable[User]("test", "user")
data: com.datastax.spark.connector.rdd.CassandraTableScanRDD[User] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:18

scala> data.collect
res0: Array[User] = Array(User(without address,None), 
   User(with address,Some(Address(street 1,city1))))

关于scala - Cassandra spark 连接器编写嵌套的可选案例类,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32231675/

相关文章:

scala - 在 Scala 中对数组数组执行元素添加的惯用方法是什么

json - 单个字段类的 Scala json 解析

java.lang.NoClassDefFoundError : org/apache/spark/Logging 错误

java - Databricks Spark 笔记本在运行之间重复使用 Scala 对象?

c# - 如何从 C# 中引用带有美元符号的标识符?

scala - 将现有的 sbt 项目导入 IntelliJ

mysql - 为什么我的 Cassandra 数据库读取数据速度太慢?想要在 10 秒内读取 100,000 行

plist - 将可选程序参数添加到 plist 的正确方法是什么?

apache-spark - Spark Structured Stream 只从 Kafka 的一个分区获取消息

python - 当使用 hbase 作为数据源时,spark 是否利用 hbase 键的排序顺序