问:如何仅将 Spark DataFrame 中的值写入 Cassanrda 并高效执行此操作? (高效地使用最少的 Scala 代码行,而不是在 Cassandra 中创建一堆墓碑,使其快速运行等)
我有一个 Cassandra 表,其中包含两个键列和 300 个潜在描述符值。
create table sample {
key1 text,
key2 text,
0 text,
............
299 text,
PRIMARY KEY (key1, key2)
}
我有一个与基础表匹配的 Spark 数据框,但是 数据帧中的每一行都非常稀疏 - 除了两个键值之外,特定行可能只有 4 到 5 个带有值的“描述符”(第 0->299 列)。
我目前正在将 Spark 数据帧转换为 RDD 并使用 saveRdd 写入数据。
这可行,但是当没有值时,“null”会存储在列中。
例如:
val saveRdd = sample.rdd
saveRdd.map(line => (
line(0), line(1), line(2),
line(3), line(4), line(5),
line(6), line(7), line(8),
line(9), line(10), line(11),
line(12), line(13), line(14),
line(15), line(16), line(17),
line(18), line(19), line(20))).saveToCassandra..........
在 Cassandra 中创建此内容:
XYZ | 10 | 10 49849 | 49849 F | |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 | |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 | TO11142017_进口|空 |空 |空 |空 |空 |空 |空 |空 |空 |空 | 20 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |斯科特·迪克·佩迪 | 斯科特·迪克·佩迪空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 | 2014 年 7 月 13 日 0:00 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 | 0 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 | |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 | 8 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 | |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |地点 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |地点 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空
在 SparkSession 上设置 Spark.cassandra.output.ignoreNulls 不起作用:
spark.conf.set("spark.cassandra.output.ignoreNulls", "true")
spark.conf.get("spark.cassandra.output.ignoreNulls")
这也不起作用:
spark-shell --conf spark.cassandra.output.ignoreNulls=true
(尝试了不同的方法来设置它,但它似乎不起作用)
withColumn
和过滤器似乎不是合适的解决方案。未设置的概念可能是正确的,但不确定在这种情况下如何使用它。
Cassandra .3.11.2
spark-cassandra-connector:2.3.0-s_2.11
Spark 2.2.0.2.6.3.0-235
谢谢!
最佳答案
您确定 ignoreNulls
不适合您吗?当给定单元格中没有值时,Cassandra 输出 null
。您可以使用 sstabledump
工具检查数据是否真正写入 SSTable - 您肯定会看到附加了删除信息的单元格(这就是存储 null 的方式)。
下面是在没有 ignoreNulls
(默认)的情况下运行 Spark 的示例,并且将 ignoreNulls
设置为 true
。测试是在 DSE 5.1.11 上完成的,该版本具有旧版本的连接器,但与 Cassandra 3.11 匹配。
让我们创建一个这样的测试表:
create table test.t3 (id int primary key, t1 text, t2 text, t3 text);
没有 ignoreNulls
- 我们需要以下代码进行测试:
case class T3(id: Int, t1: Option[String], t2: Option[String], t3: Option[String])
val rdd = sc.parallelize(Seq(new T3(1, None, Some("t2"), None)))
rdd.saveToCassandra("test", "t3")
如果我们使用 cqlsh
查看数据,我们将看到以下内容:
cqlsh:test> SELECT * from test.t3;
id | t1 | t2 | t3
----+------+----+------
1 | null | t2 | null
(1 rows)
完成nodetoollush
后,我们可以查看SSTables。这就是我们将在这里看到的内容:
>sstabledump mc-1-big-Data.db
[
{
"partition" : {
"key" : [ "1" ],
"position" : 0
},
"rows" : [
{
"type" : "row",
"position" : 30,
"liveness_info" : { "tstamp" : "2018-11-06T07:53:38.418171Z" },
"cells" : [
{ "name" : "t1", "deletion_info" : { "local_delete_time" : "2018-11-06T07:53:38Z" }
},
{ "name" : "t2", "value" : "t2" },
{ "name" : "t3", "deletion_info" : { "local_delete_time" : "2018-11-06T07:53:38Z" }
}
]
}
]
}
]
您可以看到,对于空值列 t1
和 t3
,有一个字段 deletion_info
。
现在,让我们使用 TRUNCATE test.t3
删除数据,然后再次启动 Spark-shell,并将 ignoreNulls
设置为 true:
dse spark --conf spark.cassandra.output.ignoreNulls=true
执行相同的 Spark 代码后,我们将在 cqlsh
中看到相同的结果:
cqlsh:test> SELECT * from test.t3;
id | t1 | t2 | t3
----+------+----+------
1 | null | t2 | null
但是执行flush后,sstabledump
显示完全不同的图片:
>sstabledump mc-3-big-Data.db
[
{
"partition" : {
"key" : [ "1" ],
"position" : 0
},
"rows" : [
{
"type" : "row",
"position" : 27,
"liveness_info" : { "tstamp" : "2018-11-06T07:56:27.035600Z" },
"cells" : [
{ "name" : "t2", "value" : "t2" }
]
}
]
}
]
如您所见,我们只有 t2
列的数据,没有提及为空的 t3
和 t1
列。
关于scala - Spark 至 Cassandra : Writing Sparse Rows With No Null Values To Cassandra,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53162033/