java - Spark 使用不同的 TTL 写入 Cassandra

标签 java apache-spark cassandra ttl scylla

在 Java Spark 中,我有一个包含“bucket_timestamp”列的数据框,它表示该行所属的存储桶的时间。

我想将数据帧写入 Cassandra 数据库。必须使用 TTL 将数据写入 DB。 TTL 应取决于存储桶时间戳 - 其中每一行的 TTL 应计算为配置。

目前我正在使用常量 TTL 和 spark 写信给 Cassandra,代码如下:

df.write().format("org.apache.spark.sql.cassandra")
            .options(new HashMap<String, String>() {
                {
                    put("keyspace", "key_space_name");
                    put("table, "table_name");
                    put("spark.cassandra.output.ttl, Long.toString(CONST_TTL)); // Should be depended on bucket_timestamp column
                }
            }).mode(SaveMode.Overwrite).save();

我想到的一种可能的方法是 - 对于每个可能的 bucket_timestamp - 根据时间戳过滤数据,计算 TTL 并将过滤后的数据写入 Cassandra。但这似乎非常低效,而不是 Spark 方式。 Java Spark 中有没有一种方法可以提供 Spark 列作为 TTL 选项,以便每一行的 TTL 都不同?

Solution should be working with Java and dataset:我遇到了一些在 Scala 中使用 RDD 执行此操作的解决方案,但没有找到使用 Java 和 DataFrame 的解决方案。

谢谢!

最佳答案

在 Spark-Cassandra 连接器选项 ( https://github.com/datastax/spark-cassandra-connector/blob/v2.3.0/spark-cassandra-connector/src/main/java/com/datastax/spark/connector/japi/RDDAndDStreamCommonJavaFunctions.java ) 中,您可以将 TTL 设置为:

  • 常量值(withConstantTTL)
  • 自动解析值(withAutoTTL)
  • 基于列的值(withPerRowTTL)

在您的情况下,您可以尝试最后一个选项,并使用您在问题中提供的规则将 TTL 计算为起始 Dataset 的新列。

对于用例,您可以在此处查看测试:https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/writer/TableWriterSpec.scala#L612

关于java - Spark 使用不同的 TTL 写入 Cassandra,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51134344/

相关文章:

java - Android - 使用 GridLayout 垂直滚动

amazon-ec2 - Apache Spark DAGScheduler 缺少舞台的 parent

apache-spark - 为 spark RDD 中的每个键创建唯一值

java - Cassandra Hector API 结果对象

mysql - 将 MySQL 表转换为 Cassandra 中的 ColumnFamily : Slow batch mutations with Hector

cassandra - 无法在 cassandra 上创建索引

java - 如何用 ","分割文本并去掉 java 中的 ","?

java - Tomcat 9 无法加载资源 : the server responded with a status of 404 ()

java - java.io.Buffer* 流与普通流有何不同?

apache-spark - java.lang.AbstractMethodError,org.apache.spark.internal.Logging$class.initializeLogIfNecessary