apache-spark - 使用目录引发 cassandra 连接器问题

标签 apache-spark cassandra spark-cassandra-connector

我正在按照说明操作 found here连接我的 spark 程序以从 Cassandra 读取数据。这是我配置 Spark 的方式:

val configBuilder = SparkSession.builder
  .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")
  .config("spark.cassandra.connection.host", cassandraUrl)
  .config("spark.cassandra.connection.port", 9042)
  .config("spark.sql.catalog.myCatalogName", "com.datastax.spark.connector.datasource.CassandraCatalog")

根据文档,一旦完成,我应该能够像这样查询 Cassandra:
spark.sql("select * from myCatalogName.myKeyspace.myTable where myPartitionKey = something")
但是,当我这样做时,我收到以下错误消息:
mismatched input '.' expecting <EOF>(line 1, pos 43)

== SQL ==
select * from myCatalog.myKeyspace.myTable where myPartitionKey = something
----------------------------------^^^

当我尝试以下格式时,我成功地从 Cassandra 检索条目:
val frame = spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("keyspace" -> "myKeyspace", "table" -> "myTable"))
  .load()
  .filter(col("timestamp") > startDate && col("timestamp") < endDate)

但是,此查询需要执行全表扫描。该表包含几百万个条目,我更愿意利用谓词下推功能,这似乎只能通过 SQL API 获得。

我正在使用 spark-core_2.11:2.4.3、spark-cassandra-connector_2.11:2.5.0 和 Cassandra 3.11.6

谢谢!

最佳答案

Catalogs API 仅在尚未发布的 SCC 3.0 版中可用。它将随 Spark 3.0 版本一起发布,因此它在 SCC 2.5.0 中不可用。因此,对于 2.5.0,您需要使用 create or replace temporary view... 显式注册您的表。 , 如 described in docs :

spark.sql("""CREATE TEMPORARY VIEW myTable
     USING org.apache.spark.sql.cassandra
     OPTIONS (
     table "myTable",
     keyspace "myKeyspace",
     pushdown "true")""")

关于下推(它们对所有数据帧 API、SQL、Scala、Python 等的工作方式相同) - 当您的 timestamp 时会发生这种过滤。是第一个聚类列。即使在这种情况下,典型的问题是您可能会指定 startDateendDate作为字符串,而不是时间戳。您可以通过执行 frame.explain 来检查,并检查谓词是否被下推 - 它应该有 *谓词名称附近的标记。

例如,

val data = spark.read.cassandraFormat("sdtest", "test").load()
val filtered = data.filter("ts >= cast('2019-03-10T14:41:34.373+0000' as timestamp) AND ts <= cast('2019-03-10T19:01:56.316+0000' as timestamp)")
val not_filtered = data.filter("ts >= '2019-03-10T14:41:34.373+0000' AND ts <= '2019-03-10T19:01:56.316+0000'")

第一个filter表达式将向下推谓词,而第二个( not_filtered )将需要完整扫描。

关于apache-spark - 使用目录引发 cassandra 连接器问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62374126/

相关文章:

text - Cassandra WHERE - 比较不区分大小写

apache-spark - spark datasax cassandra 连接器从沉重的 cassandra 表读取速度慢

cassandra - 从 cassandra 集合中减去 (-) 是否会创建墓碑?

java - 以最佳方式计算 JavaRDD 的统计信息

scala - Spark 至 Cassandra : Writing Sparse Rows With No Null Values To Cassandra

apache-spark - 如何在执行程序运行的地方使用长生命周期的昂贵的实例化实用程序服务?

python - Spark RDD 到 DataFrame python

scala - 在 Spark-SQL 中避免 SQL 注入(inject)的首选方法是什么(在 Hive 上)

python - Pyspark - 获取具有条件的列的累计总和

ubuntu - Cassandra 3.0.12 - nofile 限制是否足够? : false