scala - Spark 至 Cassandra : Writing Sparse Rows With No Null Values To Cassandra

标签 scala apache-spark cassandra apache-spark-sql spark-cassandra-connector

问:如何仅将 Spark DataFrame 中的值写入 Cassanrda 并高效执行此操作? (高效地使用最少的 Scala 代码行,而不是在 Cassandra 中创建一堆墓碑,使其快速运行等)

我有一个 Cassandra 表,其中包含两个键列和 300 个潜在描述符值。

create table sample {
    key1   text,
    key2   text,
    0      text,
    ............
    299    text,
    PRIMARY KEY (key1, key2)
}

我有一个与基础表匹配的 Spark 数据框,但是 数据帧中的每一行都非常稀疏 - 除了两个键值之外,特定行可能只有 4 到 5 个带有值的“描述符”(第 0->299 列)。

我目前正在将 Spark 数据帧转换为 RDD 并使用 saveRdd 写入数据。

这可行,但是当没有值时,“null”会存储在列中。

例如:

  val saveRdd = sample.rdd

  saveRdd.map(line => (
    line(0), line(1), line(2),
    line(3), line(4), line(5),
    line(6), line(7), line(8),
    line(9), line(10), line(11),
    line(12), line(13), line(14),
    line(15), line(16), line(17),
    line(18), line(19), line(20))).saveToCassandra..........

在 Cassandra 中创建此内容:

XYZ | 10 | 10 49849 | 49849 F | |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 | |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 | TO11142017_进口|空 |空 |空 |空 |空 |空 |空 |空 |空 |空 | 20 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |斯科特·迪克·佩迪 | 斯科特·迪克·佩迪空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 | 2014 年 7 月 13 日 0:00 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 | 0 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 | |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 | 8 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 | |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |地点 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空 |地点 |空 |空 |空 |空 |空 |空 |空 |空 |空 |空

在 SparkSession 上设置 Spark.cassandra.output.ignoreNulls 不起作用:

spark.conf.set("spark.cassandra.output.ignoreNulls", "true")
spark.conf.get("spark.cassandra.output.ignoreNulls")

这也不起作用:

spark-shell  --conf spark.cassandra.output.ignoreNulls=true

(尝试了不同的方法来设置它,但它似乎不起作用)

withColumn 和过滤器似乎不是合适的解决方案。未设置的概念可能是正确的,但不确定在这种情况下如何使用它。

Cassandra .3.11.2

spark-cassandra-connector:2.3.0-s_2.11

Spark 2.2.0.2.6.3.0-235

谢谢!

最佳答案

您确定 ignoreNulls 不适合您吗?当给定单元格中没有值时,Cassandra 输出 null。您可以使用 sstabledump 工具检查数据是否真正写入 SSTable - 您肯定会看到附加了删除信息的单元格(这就是存储 null 的方式)。

下面是在没有 ignoreNulls(默认)的情况下运行 Spark 的示例,并且将 ignoreNulls 设置为 true。测试是在 DSE 5.1.11 上完成的,该版本具有旧版本的连接器,但与 Cassandra 3.11 匹配。

让我们创建一个这样的测试表:

create table test.t3 (id int primary key, t1 text, t2 text, t3 text);

没有 ignoreNulls - 我们需要以下代码进行测试:

case class T3(id: Int, t1: Option[String], t2: Option[String], t3: Option[String])
val rdd = sc.parallelize(Seq(new T3(1, None, Some("t2"), None)))
rdd.saveToCassandra("test", "t3")

如果我们使用 cqlsh 查看数据,我们将看到以下内容:

cqlsh:test> SELECT * from test.t3;

 id | t1   | t2 | t3
----+------+----+------
  1 | null | t2 | null

(1 rows)

完成nodetoollush后,我们可以查看SSTables。这就是我们将在这里看到的内容:

>sstabledump mc-1-big-Data.db
[
  {
    "partition" : {
      "key" : [ "1" ],
      "position" : 0
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 30,
        "liveness_info" : { "tstamp" : "2018-11-06T07:53:38.418171Z" },
        "cells" : [
          { "name" : "t1", "deletion_info" : { "local_delete_time" : "2018-11-06T07:53:38Z" }
          },
          { "name" : "t2", "value" : "t2" },
          { "name" : "t3", "deletion_info" : { "local_delete_time" : "2018-11-06T07:53:38Z" }
          }
        ]
      }
    ]
  }
]

您可以看到,对于空值列 t1t3,有一个字段 deletion_info

现在,让我们使用 TRUNCATE test.t3 删除数据,然后再次启动 Spark-shell,并将 ignoreNulls 设置为 true:

dse spark --conf spark.cassandra.output.ignoreNulls=true

执行相同的 Spark 代码后,我们将在 cqlsh 中看到相同的结果:

cqlsh:test> SELECT * from test.t3;

 id | t1   | t2 | t3
----+------+----+------
  1 | null | t2 | null

但是执行flush后,sstabledump显示完全不同的图片:

>sstabledump mc-3-big-Data.db
[
  {
    "partition" : {
      "key" : [ "1" ],
      "position" : 0
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 27,
        "liveness_info" : { "tstamp" : "2018-11-06T07:56:27.035600Z" },
        "cells" : [
          { "name" : "t2", "value" : "t2" }
        ]
      }
    ]
  }
]

如您所见,我们只有 t2 列的数据,没有提及为空的 t3t1 列。

关于scala - Spark 至 Cassandra : Writing Sparse Rows With No Null Values To Cassandra,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53162033/

相关文章:

java - Scala、SparkLauncher无法运行程序 "/etc/spark/conf.cloudera.CD-SPARK_ON_YARN-brkvSOzr/yarn-conf/topology.py"

mysql - 玩Scala Anorm一对多关系

python - PySpark 序列化 EOFError

apache-spark - 如何指定多个Spark Standalone master(对于spark.master属性)?

cassandra - select count(*) 在 Cassandra 中遇到超时问题

scala - 在Spark中舍入为Double

scala - 在 EMR 上烫伤 : Hadoop job fails with NoSuchMethodError: scala. Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;

apache-spark - SparkR write.df 合并为一个文件

hadoop - rdd.saveAsTextFile 似乎不起作用,但重复抛出 FileAlreadyExistsException

java - 带有 ConnectionInitException 的 java WARN 消息的 Spark Cassandra Connector