apache-spark - Spark : daily read from Cassandra and save to parquets, 如何只读取新行?

标签 apache-spark cassandra spark-cassandra-connector

我正在尝试使用 Spark 构建 ETL 流程。我的目标是阅读 Cassandra 表并保存到 parquet 文件中。

到目前为止,我所做的是使用 Cassandra 连接器(在 pyspark 中)从 Cassandra 读取整个表:

df = app.sqlSparkContext.read.format("org.apache.spark.sql.cassandra")\
        .option("table", my_table)\
        .option("keyspace",my_keyspace)\
        .load()

问题是我的数据正在快速增长,我想每天重复 ETL 过程,从 Cassandra 读取新添加的行并将它们保存到新的 parquet 文件中。

由于我的 Cassandra 表中没有排序,我将无法根据时间进行读取,有什么方法可以从 Spark 端执行此操作吗?

最佳答案

只有当您有基于时间的第一个聚类列时,基于时间的有效过滤才真正可能,如下所示:

create table test.test (
  pk1 <type>,
  pk2 <type>,
  cl1 timestamp,
  cl2 ...,
  primary key ((pk1, pk2), cl1, cl2));

在本例中,条件为 cl1,如下所示:

import org.apache.spark.sql.cassandra._
val data = { spark.read.cassandraFormat("test", "test").load()}
val filtered = data.filter("cl1 >= cast('2019-03-10T14:41:34.373+0000' as timestamp)")

将被有效地推送到 Cassandra 中,并且过滤将在服务器端进行,仅检索必要的数据 - 这很容易通过解释进行检查 - 它应该生成类似这样的内容(推送的过滤器表示为 *) :

// *Filter ((cl1#23 >= 1552228894373000))
// +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [pk1#21,pk2#22L,cl1#23,...] 
PushedFilters: [*GreaterThanOrEqual(cl1,2019-03-10 14:41:34.373)], 
ReadSchema: struct<pk1:int,pk2:int,cl1:timestamp,...

在所有其他情况下,过滤将在 Spark 端进行,从 Cassandra 检索所有数据。

关于apache-spark - Spark : daily read from Cassandra and save to parquets, 如何只读取新行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57150948/

相关文章:

elasticsearch - Pyspark使用saveAsNewAPIHadoopFile将DStream数据写入Elasticsearch

apache-spark - VectorUDT 用法

cassandra - 为Cassandra中的每个分区键获取第一行

dataframe - Spark Dataframe.cache() 更改源的行为

scala - Spark 至 Cassandra : Writing Sparse Rows With No Null Values To Cassandra

scala - 在 Spark 作业中写入 HBase : a conundrum with existential types

apache-spark - spark Dataset 中的类型安全是什么意思?

php - 如何将 Cassandra 与 PHP 连接起来

database - 有没有办法在Golang中实现cassandra "decimal"数据类型

scala - 从 Scala 中的数组中删除方括号 []