我正在尝试使用以下方法过滤巨大的 Cassandra 表的一小部分:
val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey(_2)).joinWithCassandraTable("listener","snapshots_test_b")
我想将 cassandra 表中的行映射到作为分区键一部分的“创建”列上。
我的表键(表的分区键)定义为:
case class TableKey(imei: String, created: Long, when: Long)
结果报错:
[error] /home/ubuntu/scala/test/test.scala:61: not enough arguments for method apply: (imei: String, created: Long)test.TableKey in object TableKey. [error] Unspecified value parameter created. [error] val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey(_2)).joinWithCassandraTable("listener","snapshots_test_b") [error] ^ [error] one error found [error] (compile:compile) Compilation failed
它只处理分区键中的一个对象,如 Documentation .
为什么有多个分区键的问题?-回答。
编辑:我尝试以正确的形式使用 joinWithCassandraTable:
val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey("*",_,startDate)).joinWithCassandraTable("listener","snapshots_test_c")
当我尝试在 Spark 上运行它时,没有错误,但它永远停留在“[stage 0:> (0+2)/2]”...
出了什么问题?
最佳答案
错误告诉您类 TableKey
需要 3 个组件来初始化,但只传递了一个参数。这是 Scala 编译错误,与 C* 或 Spark 无关。
val snapshotsFiltered = sc.parallelize(startDate to endDate)
.map(TableKey(_2)) /// Table Key does not have a single element constructor so this will fail
.joinWithCassandraTable("listener","snapshots_test_b")
但总的来说,C* 使用整个
partition key
确定特定行所在的位置。因此,只有在您了解整个 partition key
的情况下才能有效地提取数据。所以只传递其中的一部分没有值(value)。joinWithCassandraTable 需要完整的
partition key
值,因此它可以有效地完成它的工作。如果您只有 parition key
的一部分您将需要执行全表扫描并使用 Spark 进行过滤。如果您只想基于
clustering column
进行过滤您可以通过按下 where
来实现。 C* 的子句,例如sc.cassandraTable("ks","test").where("clustering_key > someValue")
关于scala - Spark joinWithCassandraTable() 映射多个分区键错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31773477/