我正在使用 Apache Spark和 Cassandra ,我想用 spark-cassandra-connector 将我的 RDD 保存到 Cassandra .
代码如下:
def saveToCassandra(step: RDD[(String, String, Date, Int, Int)]) = {
step.saveToCassandra("keyspace", "table")
}
这在大多数情况下都可以正常工作,但会覆盖数据库中已经存在的数据。我不想覆盖任何数据。这有可能吗?
最佳答案
我的做法是:
rdd.foreachPartition(x => connector.WithSessionDo(session => {
someUpdater.UpdateEntries(x, session)
// or
x.foreach(y => someUpdater.UpdateEntry(y, session))
}))
上面的连接器
是CassandraConnector(sparkConf)
。
它不如简单的 saveToCassandra
好用,但它允许细粒度控制。
关于cassandra - 如何将 RDD 插入(不保存或更新)到 Cassandra 中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27568513/