是否有任何 API 可用于使用 Spark Scala 删除特定的 HBase 单元。我们可以使用 Spark-HBase Connector 进行读写。任何关于删除单元格的建议都是非常值得赞赏的。
最佳答案
这里是使用 Spark 删除 HBase Cell
对象的实现(我使用 parallelize
演示了它,您可以将其调整为您的 Cells RDD)。
总体思路:分块删除 - 迭代每个 RDD 分区,将分区拆分为 10,000 个 Cell 的 block ,将每个 Cell 转换为 HBase Delete
对象,然后调用 table.delete()
从 HBase 执行删除操作。
public void deleteCells(List<Cell> cellsToDelete) {
JavaSparkContext sc = new JavaSparkContext();
sc.parallelize(cellsToDelete)
.foreachPartition(cellsIterator -> {
int chunkSize = 100000; // Will contact HBase only once per 100,000 records
org.apache.hadoop.conf.Configuration config = new org.apache.hadoop.conf.Configuration();
config.set("hbase.zookeeper.quorum", "YOUR-ZOOKEEPER-HOSTNAME");
Table table;
try {
Connection connection = ConnectionFactory.createConnection(config);
table = connection.getTable(TableName.valueOf(config.get("YOUR-HBASE-TABLE")));
}
catch (IOException e)
{
logger.error("Failed to connect to HBase due to inner exception: " + e);
return;
}
// Split the given cells iterator to chunks
Iterators.partition(cellsIterator, chunkSize)
.forEachRemaining(cellsChunk -> {
List<Delete> deletions = Lists.newArrayList(cellsChunk
.stream()
.map(cell -> new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())
.addColumn(cell.getFamily(), cell.getQualifier(), System.currentTimeMillis()))
.iterator());
try {
table.delete(deletions);
} catch (IOException e) {
logger.error("Failed to delete a chunk due to inner exception: " + e);
}
});
});
}
免责声明:这个确切的代码片段未经测试,但我已经使用相同的方法使用 Spark 删除了数十亿个 HBase 单元。
关于apache-spark - 使用spark删除hbase单元,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36183709/