我正在尝试使用 Spark 构建 ETL 流程。我的目标是阅读 Cassandra 表并保存到 parquet 文件中。
到目前为止,我所做的是使用 Cassandra 连接器(在 pyspark 中)从 Cassandra 读取整个表:
df = app.sqlSparkContext.read.format("org.apache.spark.sql.cassandra")\
.option("table", my_table)\
.option("keyspace",my_keyspace)\
.load()
问题是我的数据正在快速增长,我想每天重复 ETL 过程,从 Cassandra 读取新添加的行并将它们保存到新的 parquet 文件中。
由于我的 Cassandra 表中没有排序,我将无法根据时间进行读取,有什么方法可以从 Spark 端执行此操作吗?
最佳答案
只有当您有基于时间的第一个聚类列时,基于时间的有效过滤才真正可能,如下所示:
create table test.test (
pk1 <type>,
pk2 <type>,
cl1 timestamp,
cl2 ...,
primary key ((pk1, pk2), cl1, cl2));
在本例中,条件为 cl1
,如下所示:
import org.apache.spark.sql.cassandra._
val data = { spark.read.cassandraFormat("test", "test").load()}
val filtered = data.filter("cl1 >= cast('2019-03-10T14:41:34.373+0000' as timestamp)")
将被有效地推送到 Cassandra 中,并且过滤将在服务器端进行,仅检索必要的数据 - 这很容易通过解释进行检查 - 它应该生成类似这样的内容(推送的过滤器表示为 *
) :
// *Filter ((cl1#23 >= 1552228894373000))
// +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [pk1#21,pk2#22L,cl1#23,...]
PushedFilters: [*GreaterThanOrEqual(cl1,2019-03-10 14:41:34.373)],
ReadSchema: struct<pk1:int,pk2:int,cl1:timestamp,...
在所有其他情况下,过滤将在 Spark 端进行,从 Cassandra 检索所有数据。
关于apache-spark - Spark : daily read from Cassandra and save to parquets, 如何只读取新行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57150948/