apache-spark - 使用spark删除hbase单元

标签 apache-spark hbase

是否有任何 API 可用于使用 Spark Scala 删除特定的 HBase 单元。我们可以使用 Spark-HBase Connector 进行读写。任何关于删除单元格的建议都是非常值得赞赏的。

最佳答案

这里是使用 Spark 删除 HBase Cell 对象的实现(我使用 parallelize 演示了它,您可以将其调整为您的 Cells RDD)。

总体思路:分块删除 - 迭代每个 RDD 分区,将分区拆分为 10,000 个 Cell 的 block ,将每个 Cell 转换为 HBase Delete 对象,然后调用 table.delete() 从 HBase 执行删除操作。

public void deleteCells(List<Cell> cellsToDelete) {

    JavaSparkContext sc = new JavaSparkContext();

    sc.parallelize(cellsToDelete)
        .foreachPartition(cellsIterator -> {
            int chunkSize = 100000; // Will contact HBase only once per 100,000 records

            org.apache.hadoop.conf.Configuration config = new org.apache.hadoop.conf.Configuration();
            config.set("hbase.zookeeper.quorum", "YOUR-ZOOKEEPER-HOSTNAME");

            Table table;

            try {
                Connection connection = ConnectionFactory.createConnection(config);
                table = connection.getTable(TableName.valueOf(config.get("YOUR-HBASE-TABLE")));
            }
            catch (IOException e)
            {
                logger.error("Failed to connect to HBase due to inner exception: " + e);

                return;
            }

            // Split the given cells iterator to chunks
            Iterators.partition(cellsIterator, chunkSize)
                .forEachRemaining(cellsChunk -> {
                    List<Delete> deletions = Lists.newArrayList(cellsChunk
                            .stream()
                            .map(cell -> new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())
                                    .addColumn(cell.getFamily(), cell.getQualifier(), System.currentTimeMillis()))
                            .iterator());

                    try {
                        table.delete(deletions);
                    } catch (IOException e) {
                        logger.error("Failed to delete a chunk due to inner exception: " + e);
                    }
                });

        });
}

免责声明:这个确切的代码片段未经测试,但我已经使用相同的方法使用 Spark 删除了数十亿个 HBase 单元。

关于apache-spark - 使用spark删除hbase单元,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36183709/

相关文章:

python - Spark : How to use HBase filter e. g QualiferFilter by python-api

java - 如何为 Google Cloud Bigtable 进行连接池

python - PySpark - 检查字符串列是否包含字符串列表中的单词并提取它们

python - pyLDAvis可视化pyspark生成的LDA模型

apache-spark - Apache Spark 中的 Printschema()

apache-spark - PySpark:根据其他三列的最大值查找一列的值

java - 无法连接到受 Kerberos 保护的 Phoenix 数据源

scala - hbase-spark 加载数据引发 NullPointerException 错误(scala)

spring - orm.CompilationManager 问题 : java compilation Error in Spring

python - PySpark 可以使用 numpy 数组吗?